summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.rst42
-rw-r--r--README.rst35
-rwxr-xr-xscripts/synapse_port_db3
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py89
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/federation_sender.py2
-rwxr-xr-xsynapse/app/homeserver.py50
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/app/user_dir.py270
-rw-r--r--synapse/config/homeserver.py3
-rw-r--r--synapse/config/push.py45
-rw-r--r--synapse/config/server.py4
-rw-r--r--synapse/federation/transaction_queue.py1
-rw-r--r--synapse/handlers/auth.py21
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/federation.py26
-rw-r--r--synapse/handlers/message.py9
-rw-r--r--synapse/handlers/room.py21
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/handlers/typing.py12
-rw-r--r--synapse/handlers/user_directory.py296
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/notifier.py3
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/replication/slave/storage/appservice.py5
-rw-r--r--synapse/replication/slave/storage/client_ips.py48
-rw-r--r--synapse/replication/slave/storage/events.py5
-rw-r--r--synapse/replication/tcp/client.py7
-rw-r--r--synapse/replication/tcp/commands.py32
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py13
-rw-r--r--synapse/replication/tcp/streams.py22
-rw-r--r--synapse/rest/client/v1/admin.py141
-rw-r--r--synapse/rest/client/v2_alpha/user_directory.py8
-rw-r--r--synapse/rest/media/v1/download_resource.py9
-rw-r--r--synapse/rest/media/v1/filepath.py18
-rw-r--r--synapse/rest/media/v1/media_repository.py33
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py7
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py32
-rw-r--r--synapse/state.py77
-rw-r--r--synapse/storage/__init__.py39
-rw-r--r--synapse/storage/_base.py34
-rw-r--r--synapse/storage/appservice.py35
-rw-r--r--synapse/storage/client_ips.py132
-rw-r--r--synapse/storage/devices.py45
-rw-r--r--synapse/storage/directory.py14
-rw-r--r--synapse/storage/events.py131
-rw-r--r--synapse/storage/filtering.py12
-rw-r--r--synapse/storage/media_repository.py10
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/registration.py13
-rw-r--r--synapse/storage/room.py96
-rw-r--r--synapse/storage/roommember.py159
-rw-r--r--synapse/storage/schema/delta/42/device_list_last_id.sql33
-rw-r--r--synapse/storage/schema/delta/43/blocked_rooms.sql21
-rw-r--r--synapse/storage/schema/delta/43/quarantine_media.sql17
-rw-r--r--synapse/storage/schema/delta/43/url_cache.sql16
-rw-r--r--synapse/storage/schema/delta/43/user_share.sql33
-rw-r--r--synapse/storage/state.py74
-rw-r--r--synapse/storage/user_directory.py394
-rw-r--r--synapse/util/caches/__init__.py2
-rw-r--r--synapse/util/caches/descriptors.py5
-rw-r--r--synapse/util/caches/expiringcache.py3
-rw-r--r--synapse/util/caches/stream_change_cache.py14
-rw-r--r--tests/storage/test__base.py2
-rw-r--r--tests/storage/test_client_ips.py5
-rw-r--r--tests/storage/test_events.py115
-rw-r--r--tests/test_state.py2
70 files changed, 2262 insertions, 623 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 82247fa528..ce356a11bd 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,45 @@
+Changes in synapse v0.22.0-rc1 (2017-06-26)
+===========================================
+
+Features:
+
+* Add a user directory API (PR #2252, and many more)
+* Add shutdown room API to remove room from local server (PR #2291)
+* Add API to quarantine media (PR #2292)
+* Add new config option to not send event contents to push servers (PR #2301)
+  Thanks to @cjdelisle!
+
+Changes:
+
+* Various performance fixes (PR #2177, #2233, #2230, #2238, #2248, #2256,
+  #2274)
+* Deduplicate sync filters (PR #2219) Thanks to @krombel!
+* Correct a typo in UPGRADE.rst (PR #2231) Thanks to @aaronraimist!
+* Add count of one time keys to sync stream (PR #2237)
+* Only store event_auth for state events (PR #2247)
+* Store URL cache preview downloads separately (PR #2299)
+
+Bug fixes:
+
+* Fix users not getting notifications when AS listened to that user_id (PR
+  #2216) Thanks to @slipeer!
+* Fix users without push set up not getting notifications after joining rooms
+  (PR #2236)
+* Fix preview url API to trim long descriptions (PR #2243)
+* Fix bug where we used cached but unpersisted state group as prev group,
+  resulting in broken state of restart (PR #2263)
+* Fix removing of pushers when using workers (PR #2267)
+* Fix CORS headers to allow Authorization header (PR #2285) Thanks to @krombel!
+
+
+Changes in synapse v0.21.1 (2017-06-15)
+=======================================
+
+Bug fixes:
+
+* Fix bug in anonymous usage statistic reporting (PR #2281)
+
+
 Changes in synapse v0.21.0 (2017-05-18)
 =======================================
 
diff --git a/README.rst b/README.rst
index 35141ac71b..4491b45181 100644
--- a/README.rst
+++ b/README.rst
@@ -359,7 +359,7 @@ https://www.archlinux.org/packages/community/any/matrix-synapse/, which should p
 the necessary dependencies. If the default web client is to be served (enabled by default in
 the generated config),
 https://www.archlinux.org/packages/community/any/python2-matrix-angular-sdk/ will also need to
-be installed. 
+be installed.
 
 Alternatively, to install using pip a few changes may be needed as ArchLinux
 defaults to python 3, but synapse currently assumes python 2.7 by default:
@@ -528,6 +528,30 @@ fix try re-installing from PyPI or directly from
     # Install from github
     pip install --user https://github.com/pyca/pynacl/tarball/master
 
+Running out of File Handles
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If synapse runs out of filehandles, it typically fails badly - live-locking
+at 100% CPU, and/or failing to accept new TCP connections (blocking the
+connecting client).  Matrix currently can legitimately use a lot of file handles,
+thanks to busy rooms like #matrix:matrix.org containing hundreds of participating
+servers.  The first time a server talks in a room it will try to connect
+simultaneously to all participating servers, which could exhaust the available
+file descriptors between DNS queries & HTTPS sockets, especially if DNS is slow
+to respond.  (We need to improve the routing algorithm used to be better than
+full mesh, but as of June 2017 this hasn't happened yet).
+
+If you hit this failure mode, we recommend increasing the maximum number of
+open file handles to be at least 4096 (assuming a default of 1024 or 256).
+This is typically done by editing ``/etc/security/limits.conf``
+
+Separately, Synapse may leak file handles if inbound HTTP requests get stuck
+during processing - e.g. blocked behind a lock or talking to a remote server etc.
+This is best diagnosed by matching up the 'Received request' and 'Processed request'
+log lines and looking for any 'Processed request' lines which take more than
+a few seconds to execute.  Please let us know at #matrix-dev:matrix.org if
+you see this failure mode so we can help debug it, however.
+
 ArchLinux
 ~~~~~~~~~
 
@@ -875,12 +899,9 @@ cache a lot of recent room data and metadata in RAM in order to speed up
 common requests.  We'll improve this in future, but for now the easiest
 way to either reduce the RAM usage (at the risk of slowing things down)
 is to set the almost-undocumented ``SYNAPSE_CACHE_FACTOR`` environment
-variable.  Roughly speaking, a SYNAPSE_CACHE_FACTOR of 1.0 will max out
-at around 3-4GB of resident memory - this is what we currently run the
-matrix.org on.  The default setting is currently 0.1, which is probably
-around a ~700MB footprint.  You can dial it down further to 0.02 if
-desired, which targets roughly ~512MB.  Conversely you can dial it up if
-you need performance for lots of users and have a box with a lot of RAM.
+variable.  The default is 0.5, which can be decreased to reduce RAM usage
+in memory constrained enviroments, or increased if performance starts to
+degrade.
 
 
 .. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 2e5d666707..7d158a46a4 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -41,6 +41,7 @@ BOOLEAN_COLUMNS = {
     "presence_stream": ["currently_active"],
     "public_room_list_stream": ["visibility"],
     "device_lists_outbound_pokes": ["sent"],
+    "users_who_share_rooms": ["share_private"],
 }
 
 
@@ -121,7 +122,7 @@ class Store(object):
                     try:
                         txn = conn.cursor()
                         return func(
-                            LoggingTransaction(txn, desc, self.database_engine, []),
+                            LoggingTransaction(txn, desc, self.database_engine, [], []),
                             *args, **kwargs
                         )
                     except self.database_engine.module.DatabaseError as e:
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 2a40bab3f0..6b0a766391 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.21.0"
+__version__ = "0.22.0-rc1"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9dbc7993df..f8266d1c81 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -23,7 +23,8 @@ from synapse import event_auth
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import AuthError, Codes
 from synapse.types import UserID
-from synapse.util import logcontext
+from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -39,6 +40,10 @@ AuthEventTypes = (
 GUEST_DEVICE_ID = "guest_device"
 
 
+class _InvalidMacaroonException(Exception):
+    pass
+
+
 class Auth(object):
     """
     FIXME: This class contains a mix of functions for authenticating users
@@ -51,6 +56,9 @@ class Auth(object):
         self.state = hs.get_state_handler()
         self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
 
+        self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
+        register_cache("token_cache", self.token_cache)
+
     @defer.inlineCallbacks
     def check_from_context(self, event, context, do_sig_check=True):
         auth_events_ids = yield self.compute_auth_events(
@@ -144,17 +152,8 @@ class Auth(object):
     @defer.inlineCallbacks
     def check_host_in_room(self, room_id, host):
         with Measure(self.clock, "check_host_in_room"):
-            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
-
-            logger.debug("calling resolve_state_groups from check_host_in_room")
-            entry = yield self.state.resolve_state_groups(
-                room_id, latest_event_ids
-            )
-
-            ret = yield self.store.is_host_joined(
-                room_id, host, entry.state_group, entry.state
-            )
-            defer.returnValue(ret)
+            latest_event_ids = yield self.store.is_host_joined(room_id, host)
+            defer.returnValue(latest_event_ids)
 
     def _check_joined_room(self, member, user_id, room_id):
         if not member or member.membership != Membership.JOIN:
@@ -209,7 +208,7 @@ class Auth(object):
                 default=[""]
             )[0]
             if user and access_token and ip_addr:
-                logcontext.preserve_fn(self.store.insert_client_ip)(
+                self.store.insert_client_ip(
                     user=user,
                     access_token=access_token,
                     ip=ip_addr,
@@ -276,8 +275,8 @@ class Auth(object):
             AuthError if no user by that token exists or the token is invalid.
         """
         try:
-            macaroon = pymacaroons.Macaroon.deserialize(token)
-        except Exception:  # deserialize can throw more-or-less anything
+            user_id, guest = self._parse_and_validate_macaroon(token, rights)
+        except _InvalidMacaroonException:
             # doesn't look like a macaroon: treat it as an opaque token which
             # must be in the database.
             # TODO: it would be nice to get rid of this, but apparently some
@@ -286,19 +285,8 @@ class Auth(object):
             defer.returnValue(r)
 
         try:
-            user_id = self.get_user_id_from_macaroon(macaroon)
             user = UserID.from_string(user_id)
 
-            self.validate_macaroon(
-                macaroon, rights, self.hs.config.expire_access_token,
-                user_id=user_id,
-            )
-
-            guest = False
-            for caveat in macaroon.caveats:
-                if caveat.caveat_id == "guest = true":
-                    guest = True
-
             if guest:
                 # Guest access tokens are not stored in the database (there can
                 # only be one access token per guest, anyway).
@@ -370,6 +358,55 @@ class Auth(object):
                 errcode=Codes.UNKNOWN_TOKEN
             )
 
+    def _parse_and_validate_macaroon(self, token, rights="access"):
+        """Takes a macaroon and tries to parse and validate it. This is cached
+        if and only if rights == access and there isn't an expiry.
+
+        On invalid macaroon raises _InvalidMacaroonException
+
+        Returns:
+            (user_id, is_guest)
+        """
+        if rights == "access":
+            cached = self.token_cache.get(token, None)
+            if cached:
+                return cached
+
+        try:
+            macaroon = pymacaroons.Macaroon.deserialize(token)
+        except Exception:  # deserialize can throw more-or-less anything
+            # doesn't look like a macaroon: treat it as an opaque token which
+            # must be in the database.
+            # TODO: it would be nice to get rid of this, but apparently some
+            # people use access tokens which aren't macaroons
+            raise _InvalidMacaroonException()
+
+        try:
+            user_id = self.get_user_id_from_macaroon(macaroon)
+
+            has_expiry = False
+            guest = False
+            for caveat in macaroon.caveats:
+                if caveat.caveat_id.startswith("time "):
+                    has_expiry = True
+                elif caveat.caveat_id == "guest = true":
+                    guest = True
+
+            self.validate_macaroon(
+                macaroon, rights, self.hs.config.expire_access_token,
+                user_id=user_id,
+            )
+        except (pymacaroons.exceptions.MacaroonException, TypeError, ValueError):
+            raise AuthError(
+                self.TOKEN_NOT_FOUND_HTTP_STATUS, "Invalid macaroon passed.",
+                errcode=Codes.UNKNOWN_TOKEN
+            )
+
+        if not has_expiry and rights == "access":
+            self.token_cache[token] = (user_id, guest)
+
+        return user_id, guest
+
     def get_user_id_from_macaroon(self, macaroon):
         """Retrieve the user_id given by the caveats on the macaroon.
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 9b72c649ac..09bc1935f1 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -24,6 +24,7 @@ from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
@@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -65,8 +65,8 @@ class ClientReaderSlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     TransactionStore,
+    SlavedClientIpStore,
     BaseSlavedStore,
-    ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
     pass
 
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index e51a69074d..03327dc47a 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -51,7 +51,7 @@ import sys
 import logging
 import gc
 
-logger = logging.getLogger("synapse.app.appservice")
+logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3457402596..081e7cce59 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -35,7 +35,7 @@ from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_d
 
 from synapse.server import HomeServer
 
-from twisted.internet import reactor, task, defer
+from twisted.internet import reactor, defer
 from twisted.application import service
 from twisted.web.resource import Resource, EncodingResourceWrapper
 from twisted.web.static import File
@@ -53,7 +53,7 @@ from synapse.api.urls import (
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.metrics import register_memory_metrics, get_metrics_for
+from synapse.metrics import register_memory_metrics
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.federation.transport.server import TransportLayerServer
@@ -398,7 +398,8 @@ def run(hs):
         ThreadPool._worker = profile(ThreadPool._worker)
         reactor.run = profile(reactor.run)
 
-    start_time = hs.get_clock().time()
+    clock = hs.get_clock()
+    start_time = clock.time()
 
     stats = {}
 
@@ -410,41 +411,23 @@ def run(hs):
         if uptime < 0:
             uptime = 0
 
-        # If the stats directory is empty then this is the first time we've
-        # reported stats.
-        first_time = not stats
-
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
         stats["total_users"] = yield hs.get_datastore().count_all_users()
 
+        total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
+        stats["total_nonbridged_users"] = total_nonbridged_users
+
         room_count = yield hs.get_datastore().get_room_count()
         stats["total_room_count"] = room_count
 
         stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
-        daily_messages = yield hs.get_datastore().count_daily_messages()
-        if daily_messages is not None:
-            stats["daily_messages"] = daily_messages
-        else:
-            stats.pop("daily_messages", None)
-
-        if first_time:
-            # Add callbacks to report the synapse stats as metrics whenever
-            # prometheus requests them, typically every 30s.
-            # As some of the stats are expensive to calculate we only update
-            # them when synapse phones home to matrix.org every 24 hours.
-            metrics = get_metrics_for("synapse.usage")
-            metrics.add_callback("timestamp", lambda: stats["timestamp"])
-            metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
-            metrics.add_callback("total_users", lambda: stats["total_users"])
-            metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
-            metrics.add_callback(
-                "daily_active_users", lambda: stats["daily_active_users"]
-            )
-            metrics.add_callback(
-                "daily_messages", lambda: stats.get("daily_messages", 0)
-            )
+        stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
+        stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
+
+        daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
+        stats["daily_sent_messages"] = daily_sent_messages
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -456,9 +439,12 @@ def run(hs):
             logger.warn("Error reporting stats: %s", e)
 
     if hs.config.report_stats:
-        phone_home_task = task.LoopingCall(phone_stats_home)
-        logger.info("Scheduling stats reporting for 24 hour intervals")
-        phone_home_task.start(60 * 60 * 24, now=False)
+        logger.info("Scheduling stats reporting for 3 hour intervals")
+        clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+
+        # We wait 5 minutes to send the first set of stats as the server can
+        # be quite busy the first few minutes
+        clock.call_later(5 * 60, phone_stats_home)
 
     def in_thread():
         # Uncomment to enable tracing of log context changes.
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 26c4416956..f57ec784fe 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
@@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository")
 class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    SlavedClientIpStore,
     TransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
-    ClientIpStore,
 ):
     pass
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 13c00ef2ba..4bdd99a966 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
@@ -77,9 +77,9 @@ class SynchrotronSlavedStore(
     SlavedPresenceStore,
     SlavedDeviceInboxStore,
     SlavedDeviceStore,
+    SlavedClientIpStore,
     RoomStore,
     BaseSlavedStore,
-    ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
     who_forgot_in_room = (
         RoomMemberStore.__dict__["who_forgot_in_room"]
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
new file mode 100644
index 0000000000..8c6300db9d
--- /dev/null
+++ b/synapse/app/user_dir.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha import user_directory
+from synapse.storage.engines import create_engine
+from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.util.caches.stream_change_cache import StreamChangeCache
+
+from synapse import events
+
+from twisted.internet import reactor
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.user_dir")
+
+
+class UserDirectorySlaveStore(
+    SlavedEventStore,
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    SlavedClientIpStore,
+    UserDirectoryStore,
+    BaseSlavedStore,
+):
+    def __init__(self, db_conn, hs):
+        super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
+
+        events_max = self._stream_id_gen.get_current_token()
+        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
+            db_conn, "current_state_delta_stream",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=events_max,  # As we share the stream id with events token
+            limit=1000,
+        )
+        self._curr_state_delta_stream_cache = StreamChangeCache(
+            "_curr_state_delta_stream_cache", min_curr_state_delta_id,
+            prefilled_cache=curr_state_delta_prefill,
+        )
+
+        self._current_state_delta_pos = events_max
+
+    def stream_positions(self):
+        result = super(UserDirectorySlaveStore, self).stream_positions()
+        result["current_state_deltas"] = self._current_state_delta_pos
+        return result
+
+    def process_replication_rows(self, stream_name, token, rows):
+        if stream_name == "current_state_deltas":
+            self._current_state_delta_pos = token
+            for row in rows:
+                self._curr_state_delta_stream_cache.entity_has_changed(
+                    row.room_id, token
+                )
+        return super(UserDirectorySlaveStore, self).process_replication_rows(
+            stream_name, token, rows
+        )
+
+
+class UserDirectoryServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_addresses = listener_config["bind_addresses"]
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    user_directory.register_servlets(self, resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
+        logger.info("Synapse user_dir now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return UserDirectoryReplicationHandler(self)
+
+
+class UserDirectoryReplicationHandler(ReplicationClientHandler):
+    def __init__(self, hs):
+        super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
+        self.user_directory = hs.get_user_directory_handler()
+
+    def on_rdata(self, stream_name, token, rows):
+        super(UserDirectoryReplicationHandler, self).on_rdata(
+            stream_name, token, rows
+        )
+        if stream_name == "current_state_deltas":
+            preserve_fn(self.user_directory.notify_new_event)()
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse user directory", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.user_dir"
+
+    setup_logging(config, use_worker_options=True)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    if config.update_user_directory:
+        sys.stderr.write(
+            "\nThe update_user_directory must be disabled in the main synapse process"
+            "\nbefore they can be run in a separate worker."
+            "\nPlease add ``update_user_directory: false`` to the main config"
+            "\n"
+        )
+        sys.exit(1)
+
+    # Force the pushers to start since they will be disabled in the main config
+    config.update_user_directory = True
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ps = UserDirectoryServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ps.setup()
+    ps.start_listening(config.worker_listeners)
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ps.get_datastore().start_profiling()
+        ps.get_state_handler().start_caching()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-user-dir",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 0f890fc04a..b22cacf8dc 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -33,6 +33,7 @@ from .jwt import JWTConfig
 from .password_auth_providers import PasswordAuthProviderConfig
 from .emailconfig import EmailConfig
 from .workers import WorkerConfig
+from .push import PushConfig
 
 
 class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
@@ -40,7 +41,7 @@ class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
                        VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig,
                        AppServiceConfig, KeyConfig, SAML2Config, CasConfig,
                        JWTConfig, PasswordConfig, EmailConfig,
-                       WorkerConfig, PasswordAuthProviderConfig,):
+                       WorkerConfig, PasswordAuthProviderConfig, PushConfig,):
     pass
 
 
diff --git a/synapse/config/push.py b/synapse/config/push.py
new file mode 100644
index 0000000000..9c68318b40
--- /dev/null
+++ b/synapse/config/push.py
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class PushConfig(Config):
+    def read_config(self, config):
+        self.push_redact_content = False
+
+        push_config = config.get("email", {})
+        self.push_redact_content = push_config.get("redact_content", False)
+
+    def default_config(self, config_dir_path, server_name, **kwargs):
+        return """
+        # Control how push messages are sent to google/apple to notifications.
+        # Normally every message said in a room with one or more people using
+        # mobile devices will be posted to a push server hosted by matrix.org
+        # which is registered with google and apple in order to allow push
+        # notifications to be sent to these mobile devices.
+        #
+        # Setting redact_content to true will make the push messages contain no
+        # message content which will provide increased privacy. This is a
+        # temporary solution pending improvements to Android and iPhone apps
+        # to get content from the app rather than the notification.
+        #
+        # For modern android devices the notification content will still appear
+        # because it is loaded by the app. iPhone, however will send a
+        # notification saying only that a message arrived and who it came from.
+        #
+        #push:
+        #   redact_content: false
+        """
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 3910b9dc31..28b4e5f50c 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -35,6 +35,10 @@ class ServerConfig(Config):
         # "disable" federation
         self.send_federation = config.get("send_federation", True)
 
+        # Whether to update the user directory or not. This should be set to
+        # false only if we are updating the user directory in a worker
+        self.update_user_directory = config.get("update_user_directory", True)
+
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
         if self.public_baseurl is not None:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index a15198e05d..003eaba893 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -187,6 +187,7 @@ class TransactionQueue(object):
                             prev_id for prev_id, _ in event.prev_events
                         ],
                     )
+                    destinations = set(destinations)
 
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e7a1bb7246..b00446bec0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -21,6 +21,7 @@ from synapse.api.constants import LoginType
 from synapse.types import UserID
 from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
 from synapse.util.async import run_on_reactor
+from synapse.util.caches.expiringcache import ExpiringCache
 
 from twisted.web.client import PartialDownloadError
 
@@ -52,7 +53,15 @@ class AuthHandler(BaseHandler):
             LoginType.DUMMY: self._check_dummy_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
-        self.sessions = {}
+
+        # This is not a cache per se, but a store of all current sessions that
+        # expire after N hours
+        self.sessions = ExpiringCache(
+            cache_name="register_sessions",
+            clock=hs.get_clock(),
+            expiry_ms=self.SESSION_EXPIRE_MS,
+            reset_expiry_on_get=True,
+        )
 
         account_handler = _AccountHandler(
             hs, check_user_exists=self.check_user_exists
@@ -617,16 +626,6 @@ class AuthHandler(BaseHandler):
         logger.debug("Saving session %s", session)
         session["last_used"] = self.hs.get_clock().time_msec()
         self.sessions[session["id"]] = session
-        self._prune_sessions()
-
-    def _prune_sessions(self):
-        for sid, sess in self.sessions.items():
-            last_used = 0
-            if 'last_used' in sess:
-                last_used = sess['last_used']
-            now = self.hs.get_clock().time_msec()
-            if last_used < now - AuthHandler.SESSION_EXPIRE_MS:
-                del self.sessions[sid]
 
     def hash(self, password):
         """Computes a secure hash of password.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 982cda3edf..ed60d494ff 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler):
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(
-            devices=((user_id, device_id) for device_id in device_map.keys())
+            user_id, device_id=None
         )
 
         devices = device_map.values()
@@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler):
         except errors.StoreError:
             raise errors.NotFoundError
         ips = yield self.store.get_last_client_ip_by_device(
-            devices=((user_id, device_id),)
+            user_id, device_id,
         )
         _update_device_from_client_ips(device, ips)
         defer.returnValue(device)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a333acc4aa..483cb8eac6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -75,6 +75,7 @@ class FederationHandler(BaseHandler):
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
         self.action_generator = hs.get_action_generator()
+        self.is_mine_id = hs.is_mine_id
 
         self.replication_layer.set_handler(self)
 
@@ -1068,6 +1069,24 @@ class FederationHandler(BaseHandler):
         """
         event = pdu
 
+        is_blocked = yield self.store.is_room_blocked(event.room_id)
+        if is_blocked:
+            raise SynapseError(403, "This room has been blocked on this server")
+
+        membership = event.content.get("membership")
+        if event.type != EventTypes.Member or membership != Membership.INVITE:
+            raise SynapseError(400, "The event was not an m.room.member invite event")
+
+        sender_domain = get_domain_from_id(event.sender)
+        if sender_domain != origin:
+            raise SynapseError(400, "The invite event was not from the server sending it")
+
+        if event.state_key is None:
+            raise SynapseError(400, "The invite event did not have a state key")
+
+        if not self.is_mine_id(event.state_key):
+            raise SynapseError(400, "The invite event must be for this server")
+
         event.internal_metadata.outlier = True
         event.internal_metadata.invite_from_remote = True
 
@@ -1102,6 +1121,9 @@ class FederationHandler(BaseHandler):
             user_id,
             "leave"
         )
+        # Mark as outlier as we don't have any state for this event; we're not
+        # even in the room.
+        event.internal_metadata.outlier = True
         event = self._sign_event(event)
 
         # Try the host that we succesfully called /make_leave/ on first for
@@ -1273,7 +1295,7 @@ class FederationHandler(BaseHandler):
             for event in res:
                 # We sign these again because there was a bug where we
                 # incorrectly signed things the first time round
-                if self.hs.is_mine_id(event.event_id):
+                if self.is_mine_id(event.event_id):
                     event.signatures.update(
                         compute_event_signature(
                             event,
@@ -1346,7 +1368,7 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.hs.is_mine_id(event.event_id):
+            if self.is_mine_id(event.event_id):
                 # FIXME: This is a temporary work around where we occasionally
                 # return events slightly differently than when they were
                 # originally signed
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a04f634c5c..24c9ffdb20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json
 
 import logging
 import random
+import ujson
 
 logger = logging.getLogger(__name__)
 
@@ -498,6 +499,14 @@ class MessageHandler(BaseHandler):
             logger.warn("Denying new event %r because %s", event, err)
             raise err
 
+        # Ensure that we can round trip before trying to persist in db
+        try:
+            dump = ujson.dumps(event.content)
+            ujson.loads(dump)
+        except:
+            logger.exception("Failed to encode content: %r", event.content)
+            raise
+
         yield self.maybe_kick_guest_users(event, context)
 
         if event.type == EventTypes.CanonicalAlias:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d2a0d6520a..5698d28088 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -61,7 +61,7 @@ class RoomCreationHandler(BaseHandler):
     }
 
     @defer.inlineCallbacks
-    def create_room(self, requester, config):
+    def create_room(self, requester, config, ratelimit=True):
         """ Creates a new room.
 
         Args:
@@ -75,7 +75,8 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
-        yield self.ratelimit(requester)
+        if ratelimit:
+            yield self.ratelimit(requester)
 
         if "room_alias_name" in config:
             for wchar in string.whitespace:
@@ -167,6 +168,7 @@ class RoomCreationHandler(BaseHandler):
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
+            power_level_content_override=config.get("power_level_content_override", {})
         )
 
         if "name" in config:
@@ -245,7 +247,8 @@ class RoomCreationHandler(BaseHandler):
             invite_list,
             initial_state,
             creation_content,
-            room_alias
+            room_alias,
+            power_level_content_override,
     ):
         def create(etype, content, **kwargs):
             e = {
@@ -291,7 +294,15 @@ class RoomCreationHandler(BaseHandler):
             ratelimit=False,
         )
 
-        if (EventTypes.PowerLevels, '') not in initial_state:
+        # We treat the power levels override specially as this needs to be one
+        # of the first events that get sent into a room.
+        pl_content = initial_state.pop((EventTypes.PowerLevels, ''), None)
+        if pl_content is not None:
+            yield send(
+                etype=EventTypes.PowerLevels,
+                content=pl_content,
+            )
+        else:
             power_level_content = {
                 "users": {
                     creator_id: 100,
@@ -316,6 +327,8 @@ class RoomCreationHandler(BaseHandler):
                 for invitee in invite_list:
                     power_level_content["users"][invitee] = 100
 
+            power_level_content.update(power_level_content_override)
+
             yield send(
                 etype=EventTypes.PowerLevels,
                 content=power_level_content,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1ca88517a2..b3f979b246 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -203,6 +203,11 @@ class RoomMemberHandler(BaseHandler):
         if not remote_room_hosts:
             remote_room_hosts = []
 
+        if effective_membership_state not in ("leave", "ban",):
+            is_blocked = yield self.store.is_room_blocked(room_id)
+            if is_blocked:
+                raise SynapseError(403, "This room has been blocked on this server")
+
         latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
@@ -369,6 +374,11 @@ class RoomMemberHandler(BaseHandler):
                     # so don't really fit into the general auth process.
                     raise AuthError(403, "Guest access not allowed")
 
+        if event.membership not in (Membership.LEAVE, Membership.BAN):
+            is_blocked = yield self.store.is_room_blocked(room_id)
+            if is_blocked:
+                raise SynapseError(403, "This room has been blocked on this server")
+
         yield message_handler.handle_new_client_event(
             requester,
             event,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 3b7818af5c..82dedbbc99 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -89,7 +89,7 @@ class TypingHandler(object):
             until = self._member_typing_until.get(member, None)
             if not until or until <= now:
                 logger.info("Timing out typing for: %s", member.user_id)
-                preserve_fn(self._stopped_typing)(member)
+                self._stopped_typing(member)
                 continue
 
             # Check if we need to resend a keep alive over federation for this
@@ -147,7 +147,7 @@ class TypingHandler(object):
             # No point sending another notification
             defer.returnValue(None)
 
-        yield self._push_update(
+        self._push_update(
             member=member,
             typing=True,
         )
@@ -171,7 +171,7 @@ class TypingHandler(object):
 
         member = RoomMember(room_id=room_id, user_id=target_user_id)
 
-        yield self._stopped_typing(member)
+        self._stopped_typing(member)
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
@@ -180,7 +180,6 @@ class TypingHandler(object):
             member = RoomMember(room_id=room_id, user_id=user_id)
             yield self._stopped_typing(member)
 
-    @defer.inlineCallbacks
     def _stopped_typing(self, member):
         if member.user_id not in self._room_typing.get(member.room_id, set()):
             # No point
@@ -189,16 +188,15 @@ class TypingHandler(object):
         self._member_typing_until.pop(member, None)
         self._member_last_federation_poke.pop(member, None)
 
-        yield self._push_update(
+        self._push_update(
             member=member,
             typing=False,
         )
 
-    @defer.inlineCallbacks
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            yield self._push_remote(member, typing)
+            preserve_fn(self._push_remote)(member, typing)
 
         self._push_update_local(
             member=member,
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 0182cf86d6..2a49456bfc 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,12 +14,12 @@
 # limitations under the License.
 
 import logging
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
 from synapse.util.metrics import Measure
+from synapse.util.async import sleep
 
 
 logger = logging.getLogger(__name__)
@@ -41,28 +41,41 @@ class UserDirectoyHandler(object):
     one public room.
     """
 
+    INITIAL_SLEEP_MS = 50
+    INITIAL_SLEEP_COUNT = 100
+    INITIAL_BATCH_SIZE = 100
+
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
         self.server_name = hs.hostname
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
+        self.is_mine_id = hs.is_mine_id
+        self.update_user_directory = hs.config.update_user_directory
 
         # When start up for the first time we need to populate the user_directory.
         # This is a set of user_id's we've inserted already
         self.initially_handled_users = set()
         self.initially_handled_users_in_public = set()
 
+        self.initially_handled_users_share = set()
+        self.initially_handled_users_share_private_room = set()
+
         # The current position in the current_state_delta stream
         self.pos = None
 
         # Guard to ensure we only process deltas one at a time
         self._is_processing = False
 
-        # We kick this off so that we don't have to wait for a change before
-        # we start populating the user directory
-        self.clock.call_later(0, self.notify_new_event)
+        if self.update_user_directory:
+            self.notifier.add_replication_callback(self.notify_new_event)
 
-    def search_users(self, search_term, limit):
+            # We kick this off so that we don't have to wait for a change before
+            # we start populating the user directory
+            self.clock.call_later(0, self.notify_new_event)
+
+    def search_users(self, user_id, search_term, limit):
         """Searches for users in directory
 
         Returns:
@@ -79,12 +92,15 @@ class UserDirectoyHandler(object):
                     ]
                 }
         """
-        return self.store.search_user_dir(search_term, limit)
+        return self.store.search_user_dir(user_id, search_term, limit)
 
     @defer.inlineCallbacks
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
+        if not self.update_user_directory:
+            return
+
         if self._is_processing:
             return
 
@@ -112,6 +128,7 @@ class UserDirectoyHandler(object):
                 if not deltas:
                     return
 
+                logger.info("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
                 self.pos = deltas[-1]["stream_id"]
@@ -130,10 +147,21 @@ class UserDirectoyHandler(object):
         # We process by going through each existing room at a time.
         room_ids = yield self.store.get_all_rooms()
 
+        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
+        num_processed_rooms = 1
+
         for room_id in room_ids:
+            logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
             yield self._handle_intial_room(room_id)
+            num_processed_rooms += 1
+            yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+        logger.info("Processed all rooms.")
 
         self.initially_handled_users = None
+        self.initially_handled_users_in_public = None
+        self.initially_handled_users_share = None
+        self.initially_handled_users_share_private_room = None
 
         yield self.store.update_user_directory_stream_pos(new_pos)
 
@@ -141,14 +169,15 @@ class UserDirectoyHandler(object):
     def _handle_intial_room(self, room_id):
         """Called when we initially fill out user_directory one room at a time
         """
-        is_in_room = yield self.state.get_is_host_in_room(room_id, self.server_name)
+        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
         if not is_in_room:
             return
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(room_id)
 
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        unhandled_users = set(users_with_profile) - self.initially_handled_users
+        user_ids = set(users_with_profile)
+        unhandled_users = user_ids - self.initially_handled_users
 
         yield self.store.add_profiles_to_user_dir(
             room_id, {
@@ -161,9 +190,76 @@ class UserDirectoyHandler(object):
         if is_public:
             yield self.store.add_users_to_public_room(
                 room_id,
-                user_ids=unhandled_users - self.initially_handled_users_in_public
+                user_ids=user_ids - self.initially_handled_users_in_public
             )
-            self.initially_handled_users_in_public != unhandled_users
+            self.initially_handled_users_in_public |= user_ids
+
+        # We now go and figure out the new users who share rooms with user entries
+        # We sleep aggressively here as otherwise it can starve resources.
+        # We also batch up inserts/updates, but try to avoid too many at once.
+        to_insert = set()
+        to_update = set()
+        count = 0
+        for user_id in user_ids:
+            if count % self.INITIAL_SLEEP_COUNT == 0:
+                yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+
+            if not self.is_mine_id(user_id):
+                count += 1
+                continue
+
+            if self.store.get_if_app_services_interested_in_user(user_id):
+                count += 1
+                continue
+
+            for other_user_id in user_ids:
+                if user_id == other_user_id:
+                    continue
+
+                if count % self.INITIAL_SLEEP_COUNT == 0:
+                    yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+                count += 1
+
+                user_set = (user_id, other_user_id)
+
+                if user_set in self.initially_handled_users_share_private_room:
+                    continue
+
+                if user_set in self.initially_handled_users_share:
+                    if is_public:
+                        continue
+                    to_update.add(user_set)
+                else:
+                    to_insert.add(user_set)
+
+                if is_public:
+                    self.initially_handled_users_share.add(user_set)
+                else:
+                    self.initially_handled_users_share_private_room.add(user_set)
+
+                if len(to_insert) > self.INITIAL_BATCH_SIZE:
+                    yield self.store.add_users_who_share_room(
+                        room_id, not is_public, to_insert,
+                    )
+                    to_insert.clear()
+
+                if len(to_update) > self.INITIAL_BATCH_SIZE:
+                    yield self.store.update_users_who_share_room(
+                        room_id, not is_public, to_update,
+                    )
+                    to_update.clear()
+
+        if to_insert:
+            yield self.store.add_users_who_share_room(
+                room_id, not is_public, to_insert,
+            )
+            to_insert.clear()
+
+        if to_update:
+            yield self.store.update_users_who_share_room(
+                room_id, not is_public, to_update,
+            )
+            to_update.clear()
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
@@ -193,17 +289,19 @@ class UserDirectoyHandler(object):
 
                 if change is None:
                     # Handle any profile changes
-                    yield self._handle_profile_change(state_key, prev_event_id, event_id)
+                    yield self._handle_profile_change(
+                        state_key, room_id, prev_event_id, event_id,
+                    )
                     continue
 
                 if not change:
                     # Need to check if the server left the room entirely, if so
                     # we might need to remove all the users in that room
-                    is_in_room = yield self.state.get_is_host_in_room(
+                    is_in_room = yield self.store.is_host_joined(
                         room_id, self.server_name,
                     )
                     if not is_in_room:
-                        logger.debug("Server left room: %r", room_id)
+                        logger.info("Server left room: %r", room_id)
                         # Fetch all the users that we marked as being in user
                         # directory due to being in the room and then check if
                         # need to remove those users or not
@@ -215,7 +313,7 @@ class UserDirectoyHandler(object):
                         logger.debug("Server is still in room: %r", room_id)
 
                 if change:  # The user joined
-                    event = yield self.store.get_event(event_id)
+                    event = yield self.store.get_event(event_id, allow_none=True)
                     profile = ProfileInfo(
                         avatar_url=event.content.get("avatar_url"),
                         display_name=event.content.get("displayname"),
@@ -238,7 +336,7 @@ class UserDirectoyHandler(object):
             event_id (str|None): The new event after the state change
             typ (str): Type of the event
         """
-        logger.debug("Handling change for %s", typ)
+        logger.debug("Handling change for %s: %s", typ, room_id)
 
         if typ == EventTypes.RoomHistoryVisibility:
             change = yield self._get_key_change(
@@ -304,12 +402,84 @@ class UserDirectoyHandler(object):
             room_id
         )
 
-        if not is_public:
-            return
+        if is_public:
+            row = yield self.store.get_user_in_public_room(user_id)
+            if not row:
+                yield self.store.add_users_to_public_room(room_id, [user_id])
+        else:
+            logger.debug("Not adding user to public dir, %r", user_id)
 
-        row = yield self.store.get_user_in_public_room(user_id)
-        if not row:
-            yield self.store.add_users_to_public_room(room_id, [user_id])
+        # Now we update users who share rooms with users. We do this by getting
+        # all the current users in the room and seeing which aren't already
+        # marked in the database as sharing with `user_id`
+
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+        to_insert = set()
+        to_update = set()
+
+        is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+
+        # First, if they're our user then we need to update for every user
+        if self.is_mine_id(user_id) and not is_appservice:
+            # Returns a map of other_user_id -> shared_private. We only need
+            # to update mappings if for users that either don't share a room
+            # already (aren't in the map) or, if the room is private, those that
+            # only share a public room.
+            user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
+                user_id
+            )
+
+            for other_user_id in users_with_profile:
+                if user_id == other_user_id:
+                    continue
+
+                shared_is_private = user_ids_shared.get(other_user_id)
+                if shared_is_private is True:
+                    # We've already marked in the database they share a private room
+                    continue
+                elif shared_is_private is False:
+                    # They already share a public room, so only update if this is
+                    # a private room
+                    if not is_public:
+                        to_update.add((user_id, other_user_id))
+                elif shared_is_private is None:
+                    # This is the first time they both share a room
+                    to_insert.add((user_id, other_user_id))
+
+        # Next we need to update for every local user in the room
+        for other_user_id in users_with_profile:
+            if user_id == other_user_id:
+                continue
+
+            is_appservice = self.store.get_if_app_services_interested_in_user(
+                other_user_id
+            )
+            if self.is_mine_id(other_user_id) and not is_appservice:
+                shared_is_private = yield self.store.get_if_users_share_a_room(
+                    other_user_id, user_id,
+                )
+                if shared_is_private is True:
+                    # We've already marked in the database they share a private room
+                    continue
+                elif shared_is_private is False:
+                    # They already share a public room, so only update if this is
+                    # a private room
+                    if not is_public:
+                        to_update.add((other_user_id, user_id))
+                elif shared_is_private is None:
+                    # This is the first time they both share a room
+                    to_insert.add((other_user_id, user_id))
+
+        if to_insert:
+            yield self.store.add_users_who_share_room(
+                room_id, not is_public, to_insert,
+            )
+
+        if to_update:
+            yield self.store.update_users_who_share_room(
+                room_id, not is_public, to_update,
+            )
 
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
@@ -327,32 +497,29 @@ class UserDirectoyHandler(object):
         row = yield self.store.get_user_in_public_room(user_id)
         update_user_in_public = row and row["room_id"] == room_id
 
-        if not update_user_in_public and not update_user_dir:
-            return
-
-        # XXX: Make this faster?
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        for j_room_id in rooms:
-            if not update_user_in_public and not update_user_dir:
-                break
+        if (update_user_in_public or update_user_dir):
+            # XXX: Make this faster?
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            for j_room_id in rooms:
+                if (not update_user_in_public and not update_user_dir):
+                    break
 
-            is_in_room = yield self.state.get_is_host_in_room(
-                j_room_id, self.server_name,
-            )
+                is_in_room = yield self.store.is_host_joined(
+                    j_room_id, self.server_name,
+                )
 
-            if not is_in_room:
-                continue
+                if not is_in_room:
+                    continue
 
-            if update_user_dir:
-                update_user_dir = False
-                yield self.store.update_user_in_user_dir(user_id, j_room_id)
+                if update_user_dir:
+                    update_user_dir = False
+                    yield self.store.update_user_in_user_dir(user_id, j_room_id)
 
-            if update_user_in_public:
                 is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
                     j_room_id
                 )
 
-                if is_public:
+                if update_user_in_public and is_public:
                     yield self.store.update_user_in_public_user_list(user_id, j_room_id)
                     update_user_in_public = False
 
@@ -361,16 +528,59 @@ class UserDirectoyHandler(object):
         elif update_user_in_public:
             yield self.store.remove_from_user_in_public_room(user_id)
 
+        # Now handle users_who_share_rooms.
+
+        # Get a list of user tuples that were in the DB due to this room and
+        # users (this includes tuples where the other user matches `user_id`)
+        user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
+            user_id, room_id,
+        )
+
+        for user_id, other_user_id in user_tuples:
+            # For each user tuple get a list of rooms that they still share,
+            # trying to find a private room, and update the entry in the DB
+            rooms = yield self.store.get_rooms_in_common_for_users(user_id, other_user_id)
+
+            # If they dont share a room anymore, remove the mapping
+            if not rooms:
+                yield self.store.remove_user_who_share_room(
+                    user_id, other_user_id,
+                )
+                continue
+
+            found_public_share = None
+            for j_room_id in rooms:
+                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
+                    j_room_id
+                )
+
+                if is_public:
+                    found_public_share = j_room_id
+                else:
+                    found_public_share = None
+                    yield self.store.update_users_who_share_room(
+                        room_id, not is_public, [(user_id, other_user_id)],
+                    )
+                    break
+
+            if found_public_share:
+                yield self.store.update_users_who_share_room(
+                    room_id, not is_public, [(user_id, other_user_id)],
+                )
+
     @defer.inlineCallbacks
-    def _handle_profile_change(self, user_id, prev_event_id, event_id):
+    def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
         """Check member event changes for any profile changes and update the
         database if there are.
         """
         if not prev_event_id or not event_id:
             return
 
-        prev_event = yield self.store.get_event(prev_event_id)
-        event = yield self.store.get_event(event_id)
+        prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+        event = yield self.store.get_event(event_id, allow_none=True)
+
+        if not prev_event or not event:
+            return
 
         if event.membership != Membership.JOIN:
             return
@@ -382,7 +592,9 @@ class UserDirectoyHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+            yield self.store.update_profile_in_user_dir(
+                user_id, new_name, new_avatar, room_id,
+            )
 
     @defer.inlineCallbacks
     def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 14715878c5..7ef3d526b1 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -412,7 +412,7 @@ def set_cors_headers(request):
     )
     request.setHeader(
         "Access-Control-Allow-Headers",
-        "Origin, X-Requested-With, Content-Type, Accept"
+        "Origin, X-Requested-With, Content-Type, Accept, Authorization"
     )
 
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6b1709d700..385208b574 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -167,7 +167,6 @@ class Notifier(object):
 
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
-        self.user_directory_handler = hs.get_user_directory_handler()
 
         if hs.should_send_federation():
             self.federation_sender = hs.get_federation_sender()
@@ -255,8 +254,6 @@ class Notifier(object):
             room_stream_id
         )
 
-        preserve_fn(self.user_directory_handler.notify_new_event)()
-
         if self.federation_sender:
             preserve_fn(self.federation_sender.notify_new_events)(
                 room_stream_id
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c0f8176e3d..8a5d473108 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -275,7 +275,7 @@ class HttpPusher(object):
         if event.type == 'm.room.member':
             d['notification']['membership'] = event.content['membership']
             d['notification']['user_is_target'] = event.state_key == self.user_id
-        if 'content' in event:
+        if not self.hs.config.push_redact_content and 'content' in event:
             d['notification']['content'] = event.content
 
         # We no longer send aliases separately, instead, we send the human
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py
index a374f2f1a2..0d3f31a50c 100644
--- a/synapse/replication/slave/storage/appservice.py
+++ b/synapse/replication/slave/storage/appservice.py
@@ -16,6 +16,7 @@
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
 from synapse.config.appservice import load_appservices
+from synapse.storage.appservice import _make_exclusive_regex
 
 
 class SlavedApplicationServiceStore(BaseSlavedStore):
@@ -25,6 +26,7 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
             hs.config.server_name,
             hs.config.app_service_config_files
         )
+        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
     get_app_service_by_token = DataStore.get_app_service_by_token.__func__
     get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__
@@ -38,3 +40,6 @@ class SlavedApplicationServiceStore(BaseSlavedStore):
     get_appservice_state = DataStore.get_appservice_state.__func__
     set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__
     set_appservice_state = DataStore.set_appservice_state.__func__
+    get_if_app_services_interested_in_user = (
+        DataStore.get_if_app_services_interested_in_user.__func__
+    )
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
new file mode 100644
index 0000000000..65250285e8
--- /dev/null
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
+from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import Cache
+
+
+class SlavedClientIpStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedClientIpStore, self).__init__(db_conn, hs)
+
+        self.client_ip_last_seen = Cache(
+            name="client_ip_last_seen",
+            keylen=4,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
+        )
+
+    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+        now = int(self._clock.time_msec())
+        user_id = user.to_string()
+        key = (user_id, access_token, ip)
+
+        try:
+            last_seen = self.client_ip_last_seen.get(key)
+        except KeyError:
+            last_seen = None
+
+        # Rate-limited inserts
+        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+            return
+
+        self.hs.get_tcp_replication().send_user_ip(
+            user_id, access_token, ip, user_agent, device_id, now
+        )
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index fcaf58b93b..94ebbffc1b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore):
     get_current_state_ids = (
         StateStore.__dict__["get_current_state_ids"]
     )
+    get_state_group_delta = StateStore.__dict__["get_state_group_delta"]
+    _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
     has_room_changed_since = DataStore.has_room_changed_since.__func__
 
     get_unread_push_actions_for_user_in_range_for_http = (
@@ -151,8 +153,7 @@ class SlavedEventStore(BaseSlavedStore):
     get_room_events_stream_for_rooms = (
         DataStore.get_room_events_stream_for_rooms.__func__
     )
-    is_host_joined = DataStore.is_host_joined.__func__
-    _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"]
+    is_host_joined = RoomMemberStore.__dict__["is_host_joined"]
     get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 
     _set_before_and_after = staticmethod(DataStore._set_before_and_after)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 90fb6c1336..6d2513c4e2 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
     FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+    UserIpCommand,
 )
 from .protocol import ClientReplicationStreamProtocol
 
@@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
         cmd = InvalidateCacheCommand(cache_func.__name__, keys)
         self.send_command(cmd)
 
+    def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """Tell the master that the user made a request.
+        """
+        cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
+        self.send_command(cmd)
+
     def await_sync(self, data):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 84d2a2272a..a009214e43 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
         return " ".join((self.cache_func, json.dumps(self.keys)))
 
 
+class UserIpCommand(Command):
+    """Sent periodically when a worker sees activity from a client.
+
+    Format::
+
+        USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
+    """
+    NAME = "USER_IP"
+
+    def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        self.user_id = user_id
+        self.access_token = access_token
+        self.ip = ip
+        self.user_agent = user_agent
+        self.device_id = device_id
+        self.last_seen = last_seen
+
+    @classmethod
+    def from_line(cls, line):
+        user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
+
+        return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
+
+    def to_line(self):
+        return " ".join((
+            self.user_id, self.access_token, self.ip, self.device_id,
+            str(self.last_seen), self.user_agent,
+        ))
+
+
 # Map of command name to command type.
 COMMAND_MAP = {
     cmd.NAME: cmd
@@ -320,6 +350,7 @@ COMMAND_MAP = {
         SyncCommand,
         RemovePusherCommand,
         InvalidateCacheCommand,
+        UserIpCommand,
     )
 }
 
@@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
     FederationAckCommand.NAME,
     RemovePusherCommand.NAME,
     InvalidateCacheCommand.NAME,
+    UserIpCommand.NAME,
     ErrorCommand.NAME,
 )
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9fee2a484b..062272f8dd 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def on_INVALIDATE_CACHE(self, cmd):
         self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
+    def on_USER_IP(self, cmd):
+        self.streamer.on_user_ip(
+            cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
+            cmd.last_seen,
+        )
+
     @defer.inlineCallbacks
     def subscribe_to_stream(self, stream_name, token):
         """Subscribe the remote to a streams.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..3ea3ca5a6f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
 federation_ack_counter = metrics.register_counter("federation_ack")
 remove_pusher_counter = metrics.register_counter("remove_pusher")
 invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+user_ip_cache_counter = metrics.register_counter("user_ip_cache")
 
 logger = logging.getLogger(__name__)
 
@@ -67,6 +68,7 @@ class ReplicationStreamer(object):
         self.store = hs.get_datastore()
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
+        self.notifier = hs.get_notifier()
 
         # Current connections.
         self.connections = []
@@ -99,7 +101,7 @@ class ReplicationStreamer(object):
         if not hs.config.send_federation:
             self.federation_sender = hs.get_federation_sender()
 
-        hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+        self.notifier.add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
@@ -237,6 +239,15 @@ class ReplicationStreamer(object):
         invalidate_cache_counter.inc()
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
+    @measure_func("repl.on_user_ip")
+    def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """The client saw a user request
+        """
+        user_ip_cache_counter.inc()
+        self.store.insert_client_ip(
+            user_id, access_token, ip, user_agent, device_id, last_seen,
+        )
+
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.
 
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 369d5f2428..fbafe12cc2 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
     "data_type",  # str
     "data",  # dict
 ))
+CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
+    "room_id",  # str
+    "type",  # str
+    "state_key",  # str
+    "event_id",  # str, optional
+))
 
 
 class Stream(object):
@@ -443,6 +449,21 @@ class AccountDataStream(Stream):
         defer.returnValue(results)
 
 
+class CurrentStateDeltaStream(Stream):
+    """Current state for a room was changed
+    """
+    NAME = "current_state_deltas"
+    ROW_TYPE = CurrentStateDeltaStreamRow
+
+    def __init__(self, hs):
+        store = hs.get_datastore()
+
+        self.current_token = store.get_max_current_state_delta_stream_id
+        self.update_function = store.get_all_updated_current_state_deltas
+
+        super(CurrentStateDeltaStream, self).__init__(hs)
+
+
 STREAMS_MAP = {
     stream.NAME: stream
     for stream in (
@@ -460,5 +481,6 @@ STREAMS_MAP = {
         FederationStream,
         TagAccountDataStream,
         AccountDataStream,
+        CurrentStateDeltaStream,
     )
 }
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 29fcd72375..7d786e8de3 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -15,8 +15,9 @@
 
 from twisted.internet import defer
 
+from synapse.api.constants import Membership
 from synapse.api.errors import AuthError, SynapseError
-from synapse.types import UserID
+from synapse.types import UserID, create_requester
 from synapse.http.servlet import parse_json_object_from_request
 
 from .base import ClientV1RestServlet, client_path_patterns
@@ -157,6 +158,142 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+class ShutdownRoomRestServlet(ClientV1RestServlet):
+    """Shuts down a room by removing all local users from the room and blocking
+    all future invites and joins to the room. Any local aliases will be repointed
+    to a new room created by `new_room_user_id` and kicked users will be auto
+    joined to the new room.
+    """
+    PATTERNS = client_path_patterns("/admin/shutdown_room/(?P<room_id>[^/]+)")
+
+    DEFAULT_MESSAGE = (
+        "Sharing illegal content on this server is not permitted and rooms in"
+        " violatation will be blocked."
+    )
+
+    def __init__(self, hs):
+        super(ShutdownRoomRestServlet, self).__init__(hs)
+        self.store = hs.get_datastore()
+        self.handlers = hs.get_handlers()
+        self.state = hs.get_state_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        content = parse_json_object_from_request(request)
+
+        new_room_user_id = content.get("new_room_user_id")
+        if not new_room_user_id:
+            raise SynapseError(400, "Please provide field `new_room_user_id`")
+
+        room_creator_requester = create_requester(new_room_user_id)
+
+        message = content.get("message", self.DEFAULT_MESSAGE)
+        room_name = content.get("room_name", "Content Violation Notification")
+
+        info = yield self.handlers.room_creation_handler.create_room(
+            room_creator_requester,
+            config={
+                "preset": "public_chat",
+                "name": room_name,
+                "power_level_content_override": {
+                    "users_default": -10,
+                },
+            },
+            ratelimit=False,
+        )
+        new_room_id = info["room_id"]
+
+        msg_handler = self.handlers.message_handler
+        yield msg_handler.create_and_send_nonmember_event(
+            room_creator_requester,
+            {
+                "type": "m.room.message",
+                "content": {"body": message, "msgtype": "m.text"},
+                "room_id": new_room_id,
+                "sender": new_room_user_id,
+            },
+            ratelimit=False,
+        )
+
+        requester_user_id = requester.user.to_string()
+
+        logger.info("Shutting down room %r", room_id)
+
+        yield self.store.block_room(room_id, requester_user_id)
+
+        users = yield self.state.get_current_user_in_room(room_id)
+        kicked_users = []
+        for user_id in users:
+            if not self.hs.is_mine_id(user_id):
+                continue
+
+            logger.info("Kicking %r from %r...", user_id, room_id)
+
+            target_requester = create_requester(user_id)
+            yield self.handlers.room_member_handler.update_membership(
+                requester=target_requester,
+                target=target_requester.user,
+                room_id=room_id,
+                action=Membership.LEAVE,
+                content={},
+                ratelimit=False
+            )
+
+            yield self.handlers.room_member_handler.forget(target_requester.user, room_id)
+
+            yield self.handlers.room_member_handler.update_membership(
+                requester=target_requester,
+                target=target_requester.user,
+                room_id=new_room_id,
+                action=Membership.JOIN,
+                content={},
+                ratelimit=False
+            )
+
+            kicked_users.append(user_id)
+
+        aliases_for_room = yield self.store.get_aliases_for_room(room_id)
+
+        yield self.store.update_aliases_for_room(
+            room_id, new_room_id, requester_user_id
+        )
+
+        defer.returnValue((200, {
+            "kicked_users": kicked_users,
+            "local_aliases": aliases_for_room,
+            "new_room_id": new_room_id,
+        }))
+
+
+class QuarantineMediaInRoom(ClientV1RestServlet):
+    """Quarantines all media in a room so that no one can download it via
+    this server.
+    """
+    PATTERNS = client_path_patterns("/admin/quarantine_media/(?P<room_id>[^/]+)")
+
+    def __init__(self, hs):
+        super(QuarantineMediaInRoom, self).__init__(hs)
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, room_id):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        num_quarantined = yield self.store.quarantine_media_ids_in_room(
+            room_id, requester.user.to_string(),
+        )
+
+        defer.returnValue((200, {"num_quarantined": num_quarantined}))
+
+
 class ResetPasswordRestServlet(ClientV1RestServlet):
     """Post request to allow an administrator reset password for a user.
     This need a user have a administrator access in Synapse.
@@ -353,3 +490,5 @@ def register_servlets(hs, http_server):
     ResetPasswordRestServlet(hs).register(http_server)
     GetUsersPaginatedRestServlet(hs).register(http_server)
     SearchUsersRestServlet(hs).register(http_server)
+    ShutdownRoomRestServlet(hs).register(http_server)
+    QuarantineMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/user_directory.py b/synapse/rest/client/v2_alpha/user_directory.py
index 17d3dffc8f..6e012da4aa 100644
--- a/synapse/rest/client/v2_alpha/user_directory.py
+++ b/synapse/rest/client/v2_alpha/user_directory.py
@@ -55,7 +55,9 @@ class UserDirectorySearchRestServlet(RestServlet):
                     ]
                 }
         """
-        yield self.auth.get_user_by_req(request, allow_guest=False)
+        requester = yield self.auth.get_user_by_req(request, allow_guest=False)
+        user_id = requester.user.to_string()
+
         body = parse_json_object_from_request(request)
 
         limit = body.get("limit", 10)
@@ -66,7 +68,9 @@ class UserDirectorySearchRestServlet(RestServlet):
         except:
             raise SynapseError(400, "`search_term` is required field")
 
-        results = yield self.user_directory_handler.search_users(search_term, limit)
+        results = yield self.user_directory_handler.search_users(
+            user_id, search_term, limit,
+        )
 
         defer.returnValue((200, results))
 
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index 6788375e85..6879249c8a 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -66,14 +66,19 @@ class DownloadResource(Resource):
     @defer.inlineCallbacks
     def _respond_local_file(self, request, media_id, name):
         media_info = yield self.store.get_local_media(media_id)
-        if not media_info:
+        if not media_info or media_info["quarantined_by"]:
             respond_404(request)
             return
 
         media_type = media_info["media_type"]
         media_length = media_info["media_length"]
         upload_name = name if name else media_info["upload_name"]
-        file_path = self.filepaths.local_media_filepath(media_id)
+        if media_info["url_cache"]:
+            # TODO: Check the file still exists, if it doesn't we can redownload
+            # it from the url `media_info["url_cache"]`
+            file_path = self.filepaths.url_cache_filepath(media_id)
+        else:
+            file_path = self.filepaths.local_media_filepath(media_id)
 
         yield respond_with_file(
             request, media_type, file_path, media_length,
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 0137458f71..d92b7ff337 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -71,3 +71,21 @@ class MediaFilePaths(object):
             self.base_path, "remote_thumbnail", server_name,
             file_id[0:2], file_id[2:4], file_id[4:],
         )
+
+    def url_cache_filepath(self, media_id):
+        return os.path.join(
+            self.base_path, "url_cache",
+            media_id[0:2], media_id[2:4], media_id[4:]
+        )
+
+    def url_cache_thumbnail(self, media_id, width, height, content_type,
+                            method):
+        top_level_type, sub_type = content_type.split("/")
+        file_name = "%i-%i-%s-%s-%s" % (
+            width, height, top_level_type, sub_type, method
+        )
+        return os.path.join(
+            self.base_path, "url_cache_thumbnails",
+            media_id[0:2], media_id[2:4], media_id[4:],
+            file_name
+        )
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index caca96c222..0ea1248ce6 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -135,6 +135,8 @@ class MediaRepository(object):
             media_info = yield self._download_remote_file(
                 server_name, media_id
             )
+        elif media_info["quarantined_by"]:
+            raise NotFoundError()
         else:
             self.recently_accessed_remotes.add((server_name, media_id))
             yield self.store.update_cached_last_access_time(
@@ -184,6 +186,7 @@ class MediaRepository(object):
                     raise
                 except NotRetryingDestination:
                     logger.warn("Not retrying destination %r", server_name)
+                    raise SynapseError(502, "Failed to fetch remote media")
                 except Exception:
                     logger.exception("Failed to fetch remote media %s/%s",
                                      server_name, media_id)
@@ -323,13 +326,17 @@ class MediaRepository(object):
             defer.returnValue(t_path)
 
     @defer.inlineCallbacks
-    def _generate_local_thumbnails(self, media_id, media_info):
+    def _generate_local_thumbnails(self, media_id, media_info, url_cache=False):
         media_type = media_info["media_type"]
         requirements = self._get_thumbnail_requirements(media_type)
         if not requirements:
             return
 
-        input_path = self.filepaths.local_media_filepath(media_id)
+        if url_cache:
+            input_path = self.filepaths.url_cache_filepath(media_id)
+        else:
+            input_path = self.filepaths.local_media_filepath(media_id)
+
         thumbnailer = Thumbnailer(input_path)
         m_width = thumbnailer.width
         m_height = thumbnailer.height
@@ -357,9 +364,14 @@ class MediaRepository(object):
 
             for t_width, t_height, t_type in scales:
                 t_method = "scale"
-                t_path = self.filepaths.local_media_thumbnail(
-                    media_id, t_width, t_height, t_type, t_method
-                )
+                if url_cache:
+                    t_path = self.filepaths.url_cache_thumbnail(
+                        media_id, t_width, t_height, t_type, t_method
+                    )
+                else:
+                    t_path = self.filepaths.local_media_thumbnail(
+                        media_id, t_width, t_height, t_type, t_method
+                    )
                 self._makedirs(t_path)
                 t_len = thumbnailer.scale(t_path, t_width, t_height, t_type)
 
@@ -374,9 +386,14 @@ class MediaRepository(object):
                     # thumbnail.
                     continue
                 t_method = "crop"
-                t_path = self.filepaths.local_media_thumbnail(
-                    media_id, t_width, t_height, t_type, t_method
-                )
+                if url_cache:
+                    t_path = self.filepaths.url_cache_thumbnail(
+                        media_id, t_width, t_height, t_type, t_method
+                    )
+                else:
+                    t_path = self.filepaths.local_media_thumbnail(
+                        media_id, t_width, t_height, t_type, t_method
+                    )
                 self._makedirs(t_path)
                 t_len = thumbnailer.crop(t_path, t_width, t_height, t_type)
                 local_thumbnails.append((
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index c680fddab5..b81a336c5d 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -164,7 +164,7 @@ class PreviewUrlResource(Resource):
 
         if _is_media(media_info['media_type']):
             dims = yield self.media_repo._generate_local_thumbnails(
-                media_info['filesystem_id'], media_info
+                media_info['filesystem_id'], media_info, url_cache=True,
             )
 
             og = {
@@ -210,7 +210,7 @@ class PreviewUrlResource(Resource):
                 if _is_media(image_info['media_type']):
                     # TODO: make sure we don't choke on white-on-transparent images
                     dims = yield self.media_repo._generate_local_thumbnails(
-                        image_info['filesystem_id'], image_info
+                        image_info['filesystem_id'], image_info, url_cache=True,
                     )
                     if dims:
                         og["og:image:width"] = dims['width']
@@ -256,7 +256,7 @@ class PreviewUrlResource(Resource):
         # XXX: horrible duplication with base_resource's _download_remote_file()
         file_id = random_string(24)
 
-        fname = self.filepaths.local_media_filepath(file_id)
+        fname = self.filepaths.url_cache_filepath(file_id)
         self.media_repo._makedirs(fname)
 
         try:
@@ -303,6 +303,7 @@ class PreviewUrlResource(Resource):
                 upload_name=download_name,
                 media_length=length,
                 user_id=user,
+                url_cache=url,
             )
 
         except Exception as e:
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index d8f54adc99..68d56b2b10 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -81,7 +81,7 @@ class ThumbnailResource(Resource):
                                  method, m_type):
         media_info = yield self.store.get_local_media(media_id)
 
-        if not media_info:
+        if not media_info or media_info["quarantined_by"]:
             respond_404(request)
             return
 
@@ -101,9 +101,16 @@ class ThumbnailResource(Resource):
             t_type = thumbnail_info["thumbnail_type"]
             t_method = thumbnail_info["thumbnail_method"]
 
-            file_path = self.filepaths.local_media_thumbnail(
-                media_id, t_width, t_height, t_type, t_method,
-            )
+            if media_info["url_cache"]:
+                # TODO: Check the file still exists, if it doesn't we can redownload
+                # it from the url `media_info["url_cache"]`
+                file_path = self.filepaths.url_cache_thumbnail(
+                    media_id, t_width, t_height, t_type, t_method,
+                )
+            else:
+                file_path = self.filepaths.local_media_thumbnail(
+                    media_id, t_width, t_height, t_type, t_method,
+                )
             yield respond_with_file(request, t_type, file_path)
 
         else:
@@ -117,7 +124,7 @@ class ThumbnailResource(Resource):
                                             desired_type):
         media_info = yield self.store.get_local_media(media_id)
 
-        if not media_info:
+        if not media_info or media_info["quarantined_by"]:
             respond_404(request)
             return
 
@@ -134,9 +141,18 @@ class ThumbnailResource(Resource):
             t_type = info["thumbnail_type"] == desired_type
 
             if t_w and t_h and t_method and t_type:
-                file_path = self.filepaths.local_media_thumbnail(
-                    media_id, desired_width, desired_height, desired_type, desired_method,
-                )
+                if media_info["url_cache"]:
+                    # TODO: Check the file still exists, if it doesn't we can redownload
+                    # it from the url `media_info["url_cache"]`
+                    file_path = self.filepaths.url_cache_thumbnail(
+                        media_id, desired_width, desired_height, desired_type,
+                        desired_method,
+                    )
+                else:
+                    file_path = self.filepaths.local_media_thumbnail(
+                        media_id, desired_width, desired_height, desired_type,
+                        desired_method,
+                    )
                 yield respond_with_file(request, desired_type, file_path)
                 return
 
diff --git a/synapse/state.py b/synapse/state.py
index dffa79e4c9..390799fbd5 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -24,13 +24,13 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
 from synapse.events.snapshot import EventContext
 from synapse.util.async import Linearizer
+from synapse.util.caches import CACHE_SIZE_FACTOR
 
 from collections import namedtuple
 from frozendict import frozendict
 
 import logging
 import hashlib
-import os
 
 logger = logging.getLogger(__name__)
 
@@ -38,9 +38,6 @@ logger = logging.getLogger(__name__)
 KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
 
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
 SIZE_OF_CACHE = int(100000 * CACHE_SIZE_FACTOR)
 EVICTION_TIMEOUT_SECONDS = 60 * 60
 
@@ -170,9 +167,7 @@ class StateHandler(object):
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_user_in_room")
         entry = yield self.resolve_state_groups(room_id, latest_event_ids)
-        joined_users = yield self.store.get_joined_users_from_state(
-            room_id, entry.state_id, entry.state
-        )
+        joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
         defer.returnValue(joined_users)
 
     @defer.inlineCallbacks
@@ -181,23 +176,10 @@ class StateHandler(object):
             latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
         entry = yield self.resolve_state_groups(room_id, latest_event_ids)
-        joined_hosts = yield self.store.get_joined_hosts(
-            room_id, entry.state_id, entry.state
-        )
+        joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
         defer.returnValue(joined_hosts)
 
     @defer.inlineCallbacks
-    def get_is_host_in_room(self, room_id, host, latest_event_ids=None):
-        if not latest_event_ids:
-            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
-        logger.debug("calling resolve_state_groups from get_is_host_in_room")
-        entry = yield self.resolve_state_groups(room_id, latest_event_ids)
-        is_host_joined = yield self.store.is_host_joined(
-            room_id, host, entry.state_id, entry.state
-        )
-        defer.returnValue(is_host_joined)
-
-    @defer.inlineCallbacks
     def compute_event_context(self, event, old_state=None):
         """Build an EventContext structure for the event.
 
@@ -206,12 +188,12 @@ class StateHandler(object):
         Returns:
             synapse.events.snapshot.EventContext:
         """
-        context = EventContext()
 
         if event.internal_metadata.is_outlier():
             # If this is an outlier, then we know it shouldn't have any current
             # state. Certainly store.get_current_state won't return any, and
             # persisting the event won't store the state group.
+            context = EventContext()
             if old_state:
                 context.prev_state_ids = {
                     (s.type, s.state_key): s.event_id for s in old_state
@@ -230,6 +212,7 @@ class StateHandler(object):
             defer.returnValue(context)
 
         if old_state:
+            context = EventContext()
             context.prev_state_ids = {
                 (s.type, s.state_key): s.event_id for s in old_state
             }
@@ -250,19 +233,13 @@ class StateHandler(object):
             defer.returnValue(context)
 
         logger.debug("calling resolve_state_groups from compute_event_context")
-        if event.is_state():
-            entry = yield self.resolve_state_groups(
-                event.room_id, [e for e, _ in event.prev_events],
-                event_type=event.type,
-                state_key=event.state_key,
-            )
-        else:
-            entry = yield self.resolve_state_groups(
-                event.room_id, [e for e, _ in event.prev_events],
-            )
+        entry = yield self.resolve_state_groups(
+            event.room_id, [e for e, _ in event.prev_events],
+        )
 
         curr_state = entry.state
 
+        context = EventContext()
         context.prev_state_ids = curr_state
         if event.is_state():
             context.state_group = self.store.get_next_state_group()
@@ -275,10 +252,14 @@ class StateHandler(object):
             context.current_state_ids = dict(context.prev_state_ids)
             context.current_state_ids[key] = event.event_id
 
-            context.prev_group = entry.prev_group
-            context.delta_ids = entry.delta_ids
-            if context.delta_ids is not None:
-                context.delta_ids = dict(context.delta_ids)
+            if entry.state_group:
+                context.prev_group = entry.state_group
+                context.delta_ids = {
+                    key: event.event_id
+                }
+            elif entry.prev_group:
+                context.prev_group = entry.prev_group
+                context.delta_ids = dict(entry.delta_ids)
                 context.delta_ids[key] = event.event_id
         else:
             if entry.state_group is None:
@@ -295,7 +276,7 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     @log_function
-    def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""):
+    def resolve_state_groups(self, room_id, event_ids):
         """ Given a list of event_ids this method fetches the state at each
         event, resolves conflicts between them and returns them.
 
@@ -320,11 +301,13 @@ class StateHandler(object):
         if len(group_names) == 1:
             name, state_list = state_groups_ids.items().pop()
 
+            prev_group, delta_ids = yield self.store.get_state_group_delta(name)
+
             defer.returnValue(_StateCacheEntry(
                 state=state_list,
                 state_group=name,
-                prev_group=name,
-                delta_ids={},
+                prev_group=prev_group,
+                delta_ids=delta_ids,
             ))
 
         with (yield self.resolve_linearizer.queue(group_names)):
@@ -368,20 +351,18 @@ class StateHandler(object):
                 if new_state_event_ids == frozenset(e_id for e_id in events):
                     state_group = sg
                     break
-            if state_group is None:
-                # Worker instances don't have access to this method, but we want
-                # to set the state_group on the main instance to increase cache
-                # hits.
-                if hasattr(self.store, "get_next_state_group"):
-                    state_group = self.store.get_next_state_group()
+
+            # TODO: We want to create a state group for this set of events, to
+            # increase cache hits, but we need to make sure that it doesn't
+            # end up as a prev_group without being added to the database
 
             prev_group = None
             delta_ids = None
-            for old_group, old_ids in state_groups_ids.items():
-                if not set(new_state.iterkeys()) - set(old_ids.iterkeys()):
+            for old_group, old_ids in state_groups_ids.iteritems():
+                if not set(new_state) - set(old_ids):
                     n_delta_ids = {
                         k: v
-                        for k, v in new_state.items()
+                        for k, v in new_state.iteritems()
                         if old_ids.get(k) != v
                     }
                     if not delta_ids or len(n_delta_ids) < len(delta_ids):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 3c88ba9860..b92472df33 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -239,13 +239,14 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn.cursor(),
             name="_find_stream_orderings_for_times_txn",
             database_engine=self.database_engine,
-            after_callbacks=[]
+            after_callbacks=[],
+            final_callbacks=[],
         )
         self._find_stream_orderings_for_times_txn(cur)
         cur.close()
 
         self.find_stream_orderings_looping_call = self._clock.looping_call(
-            self._find_stream_orderings_for_times, 60 * 60 * 1000
+            self._find_stream_orderings_for_times, 10 * 60 * 1000
         )
 
         self._stream_order_on_start = self.get_room_max_stream_ordering()
@@ -286,31 +287,23 @@ class DataStore(RoomMemberStore, RoomStore,
         Counts the number of users who used this homeserver in the last 24 hours.
         """
         def _count_users(txn):
-            txn.execute(
-                "SELECT COUNT(DISTINCT user_id) AS users"
-                " FROM user_ips"
-                " WHERE last_seen > ?",
-                # This is close enough to a day for our purposes.
-                (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
-            )
-            rows = self.cursor_to_dict(txn)
-            if rows:
-                return rows[0]["users"]
-            return 0
+            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
+
+            sql = """
+                SELECT COALESCE(count(*), 0) FROM (
+                    SELECT user_id FROM user_ips
+                    WHERE last_seen > ?
+                    GROUP BY user_id
+                ) u
+            """
+
+            txn.execute(sql, (yesterday,))
+            count, = txn.fetchone()
+            return count
 
         ret = yield self.runInteraction("count_users", _count_users)
         defer.returnValue(ret)
 
-    def get_user_ip_and_agents(self, user):
-        return self._simple_select_list(
-            table="user_ips",
-            keyvalues={"user_id": user.to_string()},
-            retcols=[
-                "access_token", "ip", "user_agent", "last_seen"
-            ],
-            desc="get_user_ip_and_agents",
-        )
-
     def get_users(self):
         """Function to reterive a list of users in users table.
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index db816346f5..6f54036d67 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,6 +16,7 @@ import logging
 
 from synapse.api.errors import StoreError
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.caches.dictionary_cache import DictionaryCache
 from synapse.util.caches.descriptors import Cache
 from synapse.storage.engines import PostgresEngine
@@ -27,10 +28,6 @@ from twisted.internet import defer
 import sys
 import time
 import threading
-import os
-
-
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 
 logger = logging.getLogger(__name__)
@@ -52,13 +49,17 @@ class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
     method."""
-    __slots__ = ["txn", "name", "database_engine", "after_callbacks"]
+    __slots__ = [
+        "txn", "name", "database_engine", "after_callbacks", "final_callbacks",
+    ]
 
-    def __init__(self, txn, name, database_engine, after_callbacks):
+    def __init__(self, txn, name, database_engine, after_callbacks,
+                 final_callbacks):
         object.__setattr__(self, "txn", txn)
         object.__setattr__(self, "name", name)
         object.__setattr__(self, "database_engine", database_engine)
         object.__setattr__(self, "after_callbacks", after_callbacks)
+        object.__setattr__(self, "final_callbacks", final_callbacks)
 
     def call_after(self, callback, *args, **kwargs):
         """Call the given callback on the main twisted thread after the
@@ -67,6 +68,9 @@ class LoggingTransaction(object):
         """
         self.after_callbacks.append((callback, args, kwargs))
 
+    def call_finally(self, callback, *args, **kwargs):
+        self.final_callbacks.append((callback, args, kwargs))
+
     def __getattr__(self, name):
         return getattr(self.txn, name)
 
@@ -217,8 +221,8 @@ class SQLBaseStore(object):
 
         self._clock.looping_call(loop, 10000)
 
-    def _new_transaction(self, conn, desc, after_callbacks, logging_context,
-                         func, *args, **kwargs):
+    def _new_transaction(self, conn, desc, after_callbacks, final_callbacks,
+                         logging_context, func, *args, **kwargs):
         start = time.time() * 1000
         txn_id = self._TXN_ID
 
@@ -237,7 +241,8 @@ class SQLBaseStore(object):
                 try:
                     txn = conn.cursor()
                     txn = LoggingTransaction(
-                        txn, name, self.database_engine, after_callbacks
+                        txn, name, self.database_engine, after_callbacks,
+                        final_callbacks,
                     )
                     r = func(txn, *args, **kwargs)
                     conn.commit()
@@ -298,6 +303,7 @@ class SQLBaseStore(object):
         start_time = time.time() * 1000
 
         after_callbacks = []
+        final_callbacks = []
 
         def inner_func(conn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
@@ -309,7 +315,7 @@ class SQLBaseStore(object):
 
                 current_context.copy_to(context)
                 return self._new_transaction(
-                    conn, desc, after_callbacks, current_context,
+                    conn, desc, after_callbacks, final_callbacks, current_context,
                     func, *args, **kwargs
                 )
 
@@ -318,9 +324,13 @@ class SQLBaseStore(object):
                 result = yield self._db_pool.runWithConnection(
                     inner_func, *args, **kwargs
                 )
-        finally:
+
             for after_callback, after_args, after_kwargs in after_callbacks:
                 after_callback(*after_args, **after_kwargs)
+        finally:
+            for after_callback, after_args, after_kwargs in final_callbacks:
+                after_callback(*after_args, **after_kwargs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -941,7 +951,7 @@ class SQLBaseStore(object):
             # __exit__ called after the transaction finishes.
             ctx = self._cache_id_gen.get_next()
             stream_id = ctx.__enter__()
-            txn.call_after(ctx.__exit__, None, None, None)
+            txn.call_finally(ctx.__exit__, None, None, None)
             txn.call_after(self.hs.get_notifier().on_new_replication_data)
 
             self._simple_insert_txn(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 532df736a5..c63935cb07 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -27,6 +27,25 @@ from ._base import SQLBaseStore
 logger = logging.getLogger(__name__)
 
 
+def _make_exclusive_regex(services_cache):
+    # We precompie a regex constructed from all the regexes that the AS's
+    # have registered for exclusive users.
+    exclusive_user_regexes = [
+        regex.pattern
+        for service in services_cache
+        for regex in service.get_exlusive_user_regexes()
+    ]
+    if exclusive_user_regexes:
+        exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
+        exclusive_user_regex = re.compile(exclusive_user_regex)
+    else:
+        # We handle this case specially otherwise the constructed regex
+        # will always match
+        exclusive_user_regex = None
+
+    return exclusive_user_regex
+
+
 class ApplicationServiceStore(SQLBaseStore):
 
     def __init__(self, hs):
@@ -36,21 +55,7 @@ class ApplicationServiceStore(SQLBaseStore):
             hs.hostname,
             hs.config.app_service_config_files
         )
-
-        # We precompie a regex constructed from all the regexes that the AS's
-        # have registered for exclusive users.
-        exclusive_user_regexes = [
-            regex.pattern
-            for service in self.services_cache
-            for regex in service.get_exlusive_user_regexes()
-        ]
-        if exclusive_user_regexes:
-            exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
-            self.exclusive_user_regex = re.compile(exclusive_user_regex)
-        else:
-            # We handle this case specially otherwise the constructed regex
-            # will always match
-            self.exclusive_user_regex = None
+        self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)
 
     def get_app_services(self):
         return self.services_cache
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 747d2df622..fc468ea185 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,11 +15,14 @@
 
 import logging
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from ._base import Cache
 from . import background_updates
 
+from synapse.util.caches import CACHE_SIZE_FACTOR
+
+
 logger = logging.getLogger(__name__)
 
 # Number of msec of granularity to store the user IP 'last seen' time. Smaller
@@ -33,7 +36,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self.client_ip_last_seen = Cache(
             name="client_ip_last_seen",
             keylen=4,
-            max_entries=5000,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
         )
 
         super(ClientIpStore, self).__init__(hs)
@@ -45,7 +48,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             columns=["user_id", "device_id", "last_seen"],
         )
 
-    @defer.inlineCallbacks
+        # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
+        self._batch_row_update = {}
+
+        self._client_ip_looper = self._clock.looping_call(
+            self._update_client_ips_batch, 5 * 1000
+        )
+        reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+
     def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
         key = (user.to_string(), access_token, ip)
@@ -57,34 +67,48 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
 
         # Rate-limited inserts
         if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
-            defer.returnValue(None)
+            return
 
         self.client_ip_last_seen.prefill(key, now)
 
-        # It's safe not to lock here: a) no unique constraint,
-        # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
-        yield self._simple_upsert(
-            "user_ips",
-            keyvalues={
-                "user_id": user.to_string(),
-                "access_token": access_token,
-                "ip": ip,
-                "user_agent": user_agent,
-                "device_id": device_id,
-            },
-            values={
-                "last_seen": now,
-            },
-            desc="insert_client_ip",
-            lock=False,
+        self._batch_row_update[key] = (user_agent, device_id, now)
+
+    def _update_client_ips_batch(self):
+        to_update = self._batch_row_update
+        self._batch_row_update = {}
+        return self.runInteraction(
+            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
         )
 
+    def _update_client_ips_batch_txn(self, txn, to_update):
+        self.database_engine.lock_table(txn, "user_ips")
+
+        for entry in to_update.iteritems():
+            (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
+
+            self._simple_upsert_txn(
+                txn,
+                table="user_ips",
+                keyvalues={
+                    "user_id": user_id,
+                    "access_token": access_token,
+                    "ip": ip,
+                    "user_agent": user_agent,
+                    "device_id": device_id,
+                },
+                values={
+                    "last_seen": last_seen,
+                },
+                lock=False,
+            )
+
     @defer.inlineCallbacks
-    def get_last_client_ip_by_device(self, devices):
+    def get_last_client_ip_by_device(self, user_id, device_id):
         """For each device_id listed, give the user_ip it was last seen on
 
         Args:
-            devices (iterable[(str, str)]):  list of (user_id, device_id) pairs
+            user_id (str)
+            device_id (str): If None fetches all devices for the user
 
         Returns:
             defer.Deferred: resolves to a dict, where the keys
@@ -95,6 +119,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         res = yield self.runInteraction(
             "get_last_client_ip_by_device",
             self._get_last_client_ip_by_device_txn,
+            user_id, device_id,
             retcols=(
                 "user_id",
                 "access_token",
@@ -103,23 +128,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 "device_id",
                 "last_seen",
             ),
-            devices=devices
         )
 
         ret = {(d["user_id"], d["device_id"]): d for d in res}
+        for key in self._batch_row_update:
+            uid, access_token, ip = key
+            if uid == user_id:
+                user_agent, did, last_seen = self._batch_row_update[key]
+                if not device_id or did == device_id:
+                    ret[(user_id, device_id)] = {
+                        "user_id": user_id,
+                        "access_token": access_token,
+                        "ip": ip,
+                        "user_agent": user_agent,
+                        "device_id": did,
+                        "last_seen": last_seen,
+                    }
         defer.returnValue(ret)
 
     @classmethod
-    def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
+    def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
         where_clauses = []
         bindings = []
-        for (user_id, device_id) in devices:
-            if device_id is None:
-                where_clauses.append("(user_id = ? AND device_id IS NULL)")
-                bindings.extend((user_id, ))
-            else:
-                where_clauses.append("(user_id = ? AND device_id = ?)")
-                bindings.extend((user_id, device_id))
+        if device_id is None:
+            where_clauses.append("user_id = ?")
+            bindings.extend((user_id, ))
+        else:
+            where_clauses.append("(user_id = ? AND device_id = ?)")
+            bindings.extend((user_id, device_id))
 
         if not where_clauses:
             return []
@@ -147,3 +183,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
 
         txn.execute(sql, bindings)
         return cls.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def get_user_ip_and_agents(self, user):
+        user_id = user.to_string()
+        results = {}
+
+        for key in self._batch_row_update:
+            uid, access_token, ip = key
+            if uid == user_id:
+                user_agent, _, last_seen = self._batch_row_update[key]
+                results[(access_token, ip)] = (user_agent, last_seen)
+
+        rows = yield self._simple_select_list(
+            table="user_ips",
+            keyvalues={"user_id": user_id},
+            retcols=[
+                "access_token", "ip", "user_agent", "last_seen"
+            ],
+            desc="get_user_ip_and_agents",
+        )
+
+        results.update(
+            ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
+            for row in rows
+        )
+        defer.returnValue(list(
+            {
+                "access_token": access_token,
+                "ip": ip,
+                "user_agent": user_agent,
+                "last_seen": last_seen,
+            }
+            for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+        ))
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d9936c88bb..bb27fd1f70 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -368,7 +368,7 @@ class DeviceStore(SQLBaseStore):
 
         prev_sent_id_sql = """
             SELECT coalesce(max(stream_id), 0) as stream_id
-            FROM device_lists_outbound_pokes
+            FROM device_lists_outbound_last_success
             WHERE destination = ? AND user_id = ? AND stream_id <= ?
         """
 
@@ -510,32 +510,43 @@ class DeviceStore(SQLBaseStore):
         )
 
     def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id):
-        # First we DELETE all rows such that only the latest row for each
-        # (destination, user_id is left. We do this by selecting first and
-        # deleting.
+        # We update the device_lists_outbound_last_success with the successfully
+        # poked users. We do the join to see which users need to be inserted and
+        # which updated.
         sql = """
-            SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes
-            WHERE destination = ? AND stream_id <= ?
+            SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL)
+            FROM device_lists_outbound_pokes as o
+            LEFT JOIN device_lists_outbound_last_success as s
+                USING (destination, user_id)
+            WHERE destination = ? AND o.stream_id <= ?
             GROUP BY user_id
-            HAVING count(*) > 1
         """
         txn.execute(sql, (destination, stream_id,))
         rows = txn.fetchall()
 
         sql = """
-            DELETE FROM device_lists_outbound_pokes
-            WHERE destination = ? AND user_id = ? AND stream_id < ?
+            UPDATE device_lists_outbound_last_success
+            SET stream_id = ?
+            WHERE destination = ? AND user_id = ?
         """
         txn.executemany(
-            sql, ((destination, row[0], row[1],) for row in rows)
+            sql, ((row[1], destination, row[0],) for row in rows if row[2])
         )
 
-        # Mark everything that is left as sent
         sql = """
-            UPDATE device_lists_outbound_pokes SET sent = ?
+            INSERT INTO device_lists_outbound_last_success
+            (destination, user_id, stream_id) VALUES (?, ?, ?)
+        """
+        txn.executemany(
+            sql, ((destination, row[0], row[1],) for row in rows if not row[2])
+        )
+
+        # Delete all sent outbound pokes
+        sql = """
+            DELETE FROM device_lists_outbound_pokes
             WHERE destination = ? AND stream_id <= ?
         """
-        txn.execute(sql, (True, destination, stream_id,))
+        txn.execute(sql, (destination, stream_id,))
 
     @defer.inlineCallbacks
     def get_user_whose_devices_changed(self, from_key):
@@ -670,6 +681,14 @@ class DeviceStore(SQLBaseStore):
                 )
             )
 
+            # Since we've deleted unsent deltas, we need to remove the entry
+            # of last successful sent so that the prev_ids are correctly set.
+            sql = """
+                DELETE FROM device_lists_outbound_last_success
+                WHERE destination = ? AND user_id = ?
+            """
+            txn.executemany(sql, ((row[0], row[1]) for row in rows))
+
             logger.info("Pruned %d device list outbound pokes", txn.rowcount)
 
         return self.runInteraction(
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 9caaf81f2c..79e7c540ad 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -170,3 +170,17 @@ class DirectoryStore(SQLBaseStore):
             "room_alias",
             desc="get_aliases_for_room",
         )
+
+    def update_aliases_for_room(self, old_room_id, new_room_id, creator):
+        def _update_aliases_for_room_txn(txn):
+            sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?"
+            txn.execute(sql, (new_room_id, creator, old_room_id,))
+            self._invalidate_cache_and_stream(
+                txn, self.get_aliases_for_room, (old_room_id,)
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_aliases_for_room, (new_room_id,)
+            )
+        return self.runInteraction(
+            "_update_aliases_for_room_txn", _update_aliases_for_room_txn
+        )
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 528f19eb87..7002b3752e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,7 +38,6 @@ from functools import wraps
 import synapse.metrics
 
 import logging
-import math
 import ujson as json
 
 # these are only included to make the type annotations work
@@ -404,6 +403,11 @@ class EventsStore(SQLBaseStore):
                         (room_id, ), new_state
                     )
 
+                for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+                    self.get_latest_event_ids_in_room.prefill(
+                        (room_id,), list(latest_event_ids)
+                    )
+
     @defer.inlineCallbacks
     def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
         """Calculates the new forward extremeties for a room given events to
@@ -776,6 +780,11 @@ class EventsStore(SQLBaseStore):
                         txn, self.get_rooms_for_user, (member,)
                     )
 
+                for host in set(get_domain_from_id(u) for u in members_changed):
+                    self._invalidate_cache_and_stream(
+                        txn, self.is_host_joined, (room_id, host)
+                    )
+
                 self._invalidate_cache_and_stream(
                     txn, self.get_users_in_room, (room_id,)
                 )
@@ -1440,7 +1449,7 @@ class EventsStore(SQLBaseStore):
                 ]
 
                 rows = self._new_transaction(
-                    conn, "do_fetch", [], None, self._fetch_event_rows, event_ids
+                    conn, "do_fetch", [], [], None, self._fetch_event_rows, event_ids
                 )
 
                 row_dict = {
@@ -1620,66 +1629,52 @@ class EventsStore(SQLBaseStore):
         call to this function, it will return None.
         """
         def _count_messages(txn):
-            now = self.hs.get_clock().time()
-
-            txn.execute(
-                "SELECT reported_stream_token, reported_time FROM stats_reporting"
-            )
-            last_reported = self.cursor_to_dict(txn)
+            sql = """
+                SELECT COALESCE(COUNT(*), 0) FROM events
+                WHERE type = 'm.room.message'
+                AND stream_ordering > ?
+            """
+            txn.execute(sql, (self.stream_ordering_day_ago,))
+            count, = txn.fetchone()
+            return count
 
-            txn.execute(
-                "SELECT stream_ordering"
-                " FROM events"
-                " ORDER BY stream_ordering DESC"
-                " LIMIT 1"
-            )
-            now_reporting = self.cursor_to_dict(txn)
-            if not now_reporting:
-                logger.info("Calculating daily messages skipped; no now_reporting")
-                return None
-            now_reporting = now_reporting[0]["stream_ordering"]
-
-            txn.execute("DELETE FROM stats_reporting")
-            txn.execute(
-                "INSERT INTO stats_reporting"
-                " (reported_stream_token, reported_time)"
-                " VALUES (?, ?)",
-                (now_reporting, now,)
-            )
+        ret = yield self.runInteraction("count_messages", _count_messages)
+        defer.returnValue(ret)
 
-            if not last_reported:
-                logger.info("Calculating daily messages skipped; no last_reported")
-                return None
-
-            # Close enough to correct for our purposes.
-            yesterday = (now - 24 * 60 * 60)
-            since_yesterday_seconds = yesterday - last_reported[0]["reported_time"]
-            any_since_yesterday = math.fabs(since_yesterday_seconds) > 60 * 60
-            if any_since_yesterday:
-                logger.info(
-                    "Calculating daily messages skipped; since_yesterday_seconds: %d" %
-                    (since_yesterday_seconds,)
-                )
-                return None
-
-            txn.execute(
-                "SELECT COUNT(*) as messages"
-                " FROM events NATURAL JOIN event_json"
-                " WHERE json like '%m.room.message%'"
-                " AND stream_ordering > ?"
-                " AND stream_ordering <= ?",
-                (
-                    last_reported[0]["reported_stream_token"],
-                    now_reporting,
-                )
-            )
-            rows = self.cursor_to_dict(txn)
-            if not rows:
-                logger.info("Calculating daily messages skipped; messages count missing")
-                return None
-            return rows[0]["messages"]
+    @defer.inlineCallbacks
+    def count_daily_sent_messages(self):
+        def _count_messages(txn):
+            # This is good enough as if you have silly characters in your own
+            # hostname then thats your own fault.
+            like_clause = "%:" + self.hs.hostname
+
+            sql = """
+                SELECT COALESCE(COUNT(*), 0) FROM events
+                WHERE type = 'm.room.message'
+                    AND sender LIKE ?
+                AND stream_ordering > ?
+            """
+
+            txn.execute(sql, (like_clause, self.stream_ordering_day_ago,))
+            count, = txn.fetchone()
+            return count
+
+        ret = yield self.runInteraction("count_daily_sent_messages", _count_messages)
+        defer.returnValue(ret)
 
-        ret = yield self.runInteraction("count_messages", _count_messages)
+    @defer.inlineCallbacks
+    def count_daily_active_rooms(self):
+        def _count(txn):
+            sql = """
+                SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events
+                WHERE type = 'm.room.message'
+                AND stream_ordering > ?
+            """
+            txn.execute(sql, (self.stream_ordering_day_ago,))
+            count, = txn.fetchone()
+            return count
+
+        ret = yield self.runInteraction("count_daily_active_rooms", _count)
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
@@ -2279,6 +2274,24 @@ class EventsStore(SQLBaseStore):
 
         defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
 
+    def get_max_current_state_delta_stream_id(self):
+        return self._stream_id_gen.get_current_token()
+
+    def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
+        def get_all_updated_current_state_deltas_txn(txn):
+            sql = """
+                SELECT stream_id, room_id, type, state_key, event_id
+                FROM current_state_delta_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC LIMIT ?
+            """
+            txn.execute(sql, (from_token, to_token, limit))
+            return txn.fetchall()
+        return self.runInteraction(
+            "get_all_updated_current_state_deltas",
+            get_all_updated_current_state_deltas_txn,
+        )
+
 
 AllNewEventsResult = namedtuple("AllNewEventsResult", [
     "new_forward_events", "new_backfill_events",
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index a2ccc66ea7..78b1e30945 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore
 from synapse.api.errors import SynapseError, Codes
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
+from canonicaljson import encode_canonical_json
 import simplejson as json
 
 
@@ -46,12 +47,21 @@ class FilteringStore(SQLBaseStore):
         defer.returnValue(json.loads(str(def_json).decode("utf-8")))
 
     def add_user_filter(self, user_localpart, user_filter):
-        def_json = json.dumps(user_filter).encode("utf-8")
+        def_json = encode_canonical_json(user_filter)
 
         # Need an atomic transaction to SELECT the maximal ID so far then
         # INSERT a new one
         def _do_txn(txn):
             sql = (
+                "SELECT filter_id FROM user_filters "
+                "WHERE user_id = ? AND filter_json = ?"
+            )
+            txn.execute(sql, (user_localpart, def_json))
+            filter_id_response = txn.fetchone()
+            if filter_id_response is not None:
+                return filter_id_response[0]
+
+            sql = (
                 "SELECT MAX(filter_id) FROM user_filters "
                 "WHERE user_id = ?"
             )
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 4c0f82353d..82bb61b811 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -30,13 +30,16 @@ class MediaRepositoryStore(SQLBaseStore):
         return self._simple_select_one(
             "local_media_repository",
             {"media_id": media_id},
-            ("media_type", "media_length", "upload_name", "created_ts"),
+            (
+                "media_type", "media_length", "upload_name", "created_ts",
+                "quarantined_by", "url_cache",
+            ),
             allow_none=True,
             desc="get_local_media",
         )
 
     def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
-                          media_length, user_id):
+                          media_length, user_id, url_cache=None):
         return self._simple_insert(
             "local_media_repository",
             {
@@ -46,6 +49,7 @@ class MediaRepositoryStore(SQLBaseStore):
                 "upload_name": upload_name,
                 "media_length": media_length,
                 "user_id": user_id.to_string(),
+                "url_cache": url_cache,
             },
             desc="store_local_media",
         )
@@ -138,7 +142,7 @@ class MediaRepositoryStore(SQLBaseStore):
             {"media_origin": origin, "media_id": media_id},
             (
                 "media_type", "media_length", "upload_name", "created_ts",
-                "filesystem_id",
+                "filesystem_id", "quarantined_by",
             ),
             allow_none=True,
             desc="get_cached_remote_media",
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index eaba699e29..72b670b83b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 42
+SCHEMA_VERSION = 43
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index ec2c52ab93..20acd58fcf 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -438,6 +438,19 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def count_nonbridged_users(self):
+        def _count_users(txn):
+            txn.execute("""
+                SELECT COALESCE(COUNT(*), 0) FROM users
+                WHERE appservice_id IS NULL
+            """)
+            count, = txn.fetchone()
+            return count
+
+        ret = yield self.runInteraction("count_users", _count_users)
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
     def find_next_generated_user_id_localpart(self):
         """
         Gets the localpart of the next generated user ID.
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 5d543652bb..23688430b7 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -24,6 +24,7 @@ from .engines import PostgresEngine, Sqlite3Engine
 import collections
 import logging
 import ujson as json
+import re
 
 logger = logging.getLogger(__name__)
 
@@ -507,3 +508,98 @@ class RoomStore(SQLBaseStore):
             ))
         else:
             defer.returnValue(None)
+
+    @cached(max_entries=10000)
+    def is_room_blocked(self, room_id):
+        return self._simple_select_one_onecol(
+            table="blocked_rooms",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="1",
+            allow_none=True,
+            desc="is_room_blocked",
+        )
+
+    @defer.inlineCallbacks
+    def block_room(self, room_id, user_id):
+        yield self._simple_insert(
+            table="blocked_rooms",
+            values={
+                "room_id": room_id,
+                "user_id": user_id,
+            },
+            desc="block_room",
+        )
+        self.is_room_blocked.invalidate((room_id,))
+
+    def quarantine_media_ids_in_room(self, room_id, quarantined_by):
+        """For a room loops through all events with media and quarantines
+        the associated media
+        """
+        def _get_media_ids_in_room(txn):
+            mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+
+            next_token = self.get_current_events_token() + 1
+
+            total_media_quarantined = 0
+
+            while next_token:
+                sql = """
+                    SELECT stream_ordering, content FROM events
+                    WHERE room_id = ?
+                        AND stream_ordering < ?
+                        AND contains_url = ? AND outlier = ?
+                    ORDER BY stream_ordering DESC
+                    LIMIT ?
+                """
+                txn.execute(sql, (room_id, next_token, True, False, 100))
+
+                next_token = None
+                local_media_mxcs = []
+                remote_media_mxcs = []
+                for stream_ordering, content_json in txn:
+                    next_token = stream_ordering
+                    content = json.loads(content_json)
+
+                    content_url = content.get("url")
+                    thumbnail_url = content.get("info", {}).get("thumbnail_url")
+
+                    for url in (content_url, thumbnail_url):
+                        if not url:
+                            continue
+                        matches = mxc_re.match(url)
+                        if matches:
+                            hostname = matches.group(1)
+                            media_id = matches.group(2)
+                            if hostname == self.hostname:
+                                local_media_mxcs.append(media_id)
+                            else:
+                                remote_media_mxcs.append((hostname, media_id))
+
+                # Now update all the tables to set the quarantined_by flag
+
+                txn.executemany("""
+                    UPDATE local_media_repository
+                    SET quarantined_by = ?
+                    WHERE media_id = ?
+                """, ((quarantined_by, media_id) for media_id in local_media_mxcs))
+
+                txn.executemany(
+                    """
+                        UPDATE remote_media_cache
+                        SET quarantined_by = ?
+                        WHERE media_origin AND media_id = ?
+                    """,
+                    (
+                        (quarantined_by, origin, media_id)
+                        for origin, media_id in remote_media_mxcs
+                    )
+                )
+
+                total_media_quarantined += len(local_media_mxcs)
+                total_media_quarantined += len(remote_media_mxcs)
+
+            return total_media_quarantined
+
+        return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0829ae5bee..457ca288d0 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from collections import namedtuple
 
 from ._base import SQLBaseStore
+from synapse.util.async import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.stringutils import to_ascii
@@ -392,7 +393,8 @@ class RoomMemberStore(SQLBaseStore):
             context=context,
         )
 
-    def get_joined_users_from_state(self, room_id, state_group, state_ids):
+    def get_joined_users_from_state(self, room_id, state_entry):
+        state_group = state_entry.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
             # state group, i.e. we need to make sure that calls with a state_group
@@ -401,7 +403,7 @@ class RoomMemberStore(SQLBaseStore):
             state_group = object()
 
         return self._get_joined_users_from_context(
-            room_id, state_group, state_ids,
+            room_id, state_group, state_entry.state, context=state_entry,
         )
 
     @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
@@ -499,42 +501,40 @@ class RoomMemberStore(SQLBaseStore):
 
         defer.returnValue(users_in_room)
 
-    def is_host_joined(self, room_id, host, state_group, state_ids):
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
+    @cachedInlineCallbacks(max_entries=10000)
+    def is_host_joined(self, room_id, host):
+        if '%' in host or '_' in host:
+            raise Exception("Invalid host name")
 
-        return self._is_host_joined(
-            room_id, host, state_group, state_ids
-        )
+        sql = """
+            SELECT state_key FROM current_state_events AS c
+            INNER JOIN room_memberships USING (event_id)
+            WHERE membership = 'join'
+                AND type = 'm.room.member'
+                AND c.room_id = ?
+                AND state_key LIKE ?
+            LIMIT 1
+        """
 
-    @cachedInlineCallbacks(num_args=3)
-    def _is_host_joined(self, room_id, host, state_group, current_state_ids):
-        # We don't use `state_group`, its there so that we can cache based
-        # on it. However, its important that its never None, since two current_state's
-        # with a state_group of None are likely to be different.
-        # See bulk_get_push_rules_for_room for how we work around this.
-        assert state_group is not None
+        # We do need to be careful to ensure that host doesn't have any wild cards
+        # in it, but we checked above for known ones and we'll check below that
+        # the returned user actually has the correct domain.
+        like_clause = "%:" + host
 
-        for (etype, state_key), event_id in current_state_ids.items():
-            if etype == EventTypes.Member:
-                try:
-                    if get_domain_from_id(state_key) != host:
-                        continue
-                except:
-                    logger.warn("state_key not user_id: %s", state_key)
-                    continue
+        rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause)
 
-                event = yield self.get_event(event_id, allow_none=True)
-                if event and event.content["membership"] == Membership.JOIN:
-                    defer.returnValue(True)
+        if not rows:
+            defer.returnValue(False)
 
-        defer.returnValue(False)
+        user_id = rows[0][0]
+        if get_domain_from_id(user_id) != host:
+            # This can only happen if the host name has something funky in it
+            raise Exception("Invalid host name")
 
-    def get_joined_hosts(self, room_id, state_group, state_ids):
+        defer.returnValue(True)
+
+    def get_joined_hosts(self, room_id, state_entry):
+        state_group = state_entry.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
             # state group, i.e. we need to make sure that calls with a state_group
@@ -543,33 +543,20 @@ class RoomMemberStore(SQLBaseStore):
             state_group = object()
 
         return self._get_joined_hosts(
-            room_id, state_group, state_ids
+            room_id, state_group, state_entry.state, state_entry=state_entry,
         )
 
     @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
-    def _get_joined_hosts(self, room_id, state_group, current_state_ids):
+    # @defer.inlineCallbacks
+    def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry):
         # We don't use `state_group`, its there so that we can cache based
         # on it. However, its important that its never None, since two current_state's
         # with a state_group of None are likely to be different.
         # See bulk_get_push_rules_for_room for how we work around this.
         assert state_group is not None
 
-        joined_hosts = set()
-        for etype, state_key in current_state_ids:
-            if etype == EventTypes.Member:
-                try:
-                    host = get_domain_from_id(state_key)
-                except:
-                    logger.warn("state_key not user_id: %s", state_key)
-                    continue
-
-                if host in joined_hosts:
-                    continue
-
-                event_id = current_state_ids[(etype, state_key)]
-                event = yield self.get_event(event_id, allow_none=True)
-                if event and event.content["membership"] == Membership.JOIN:
-                    joined_hosts.add(intern_string(host))
+        cache = self._get_joined_hosts_cache(room_id)
+        joined_hosts = yield cache.get_destinations(state_entry)
 
         defer.returnValue(joined_hosts)
 
@@ -647,3 +634,75 @@ class RoomMemberStore(SQLBaseStore):
             yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
 
         defer.returnValue(result)
+
+    @cached(max_entries=10000, iterable=True)
+    def _get_joined_hosts_cache(self, room_id):
+        return _JoinedHostsCache(self, room_id)
+
+
+class _JoinedHostsCache(object):
+    """Cache for joined hosts in a room that is optimised to handle updates
+    via state deltas.
+    """
+
+    def __init__(self, store, room_id):
+        self.store = store
+        self.room_id = room_id
+
+        self.hosts_to_joined_users = {}
+
+        self.state_group = object()
+
+        self.linearizer = Linearizer("_JoinedHostsCache")
+
+        self._len = 0
+
+    @defer.inlineCallbacks
+    def get_destinations(self, state_entry):
+        """Get set of destinations for a state entry
+
+        Args:
+            state_entry(synapse.state._StateCacheEntry)
+        """
+        if state_entry.state_group == self.state_group:
+            defer.returnValue(frozenset(self.hosts_to_joined_users))
+
+        with (yield self.linearizer.queue(())):
+            if state_entry.state_group == self.state_group:
+                pass
+            elif state_entry.prev_group == self.state_group:
+                for (typ, state_key), event_id in state_entry.delta_ids.iteritems():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    host = intern_string(get_domain_from_id(state_key))
+                    user_id = state_key
+                    known_joins = self.hosts_to_joined_users.setdefault(host, set())
+
+                    event = yield self.store.get_event(event_id)
+                    if event.membership == Membership.JOIN:
+                        known_joins.add(user_id)
+                    else:
+                        known_joins.discard(user_id)
+
+                        if not known_joins:
+                            self.hosts_to_joined_users.pop(host, None)
+            else:
+                joined_users = yield self.store.get_joined_users_from_state(
+                    self.room_id, state_entry,
+                )
+
+                self.hosts_to_joined_users = {}
+                for user_id in joined_users:
+                    host = intern_string(get_domain_from_id(user_id))
+                    self.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+            if state_entry.state_group:
+                self.state_group = state_entry.state_group
+            else:
+                self.state_group = object()
+            self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues())
+        defer.returnValue(frozenset(self.hosts_to_joined_users))
+
+    def __len__(self):
+        return self._len
diff --git a/synapse/storage/schema/delta/42/device_list_last_id.sql b/synapse/storage/schema/delta/42/device_list_last_id.sql
new file mode 100644
index 0000000000..9ab8c14fa3
--- /dev/null
+++ b/synapse/storage/schema/delta/42/device_list_last_id.sql
@@ -0,0 +1,33 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Table of last stream_id that we sent to destination for user_id. This is
+-- used to fill out the `prev_id` fields of outbound device list updates.
+CREATE TABLE device_lists_outbound_last_success (
+    destination TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL
+);
+
+INSERT INTO device_lists_outbound_last_success
+    SELECT destination, user_id, coalesce(max(stream_id), 0) as stream_id
+        FROM device_lists_outbound_pokes
+        WHERE sent = (1 = 1)  -- sqlite doesn't have inbuilt boolean values
+        GROUP BY destination, user_id;
+
+CREATE INDEX device_lists_outbound_last_success_idx ON device_lists_outbound_last_success(
+    destination, user_id, stream_id
+);
diff --git a/synapse/storage/schema/delta/43/blocked_rooms.sql b/synapse/storage/schema/delta/43/blocked_rooms.sql
new file mode 100644
index 0000000000..0e3cd143ff
--- /dev/null
+++ b/synapse/storage/schema/delta/43/blocked_rooms.sql
@@ -0,0 +1,21 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE blocked_rooms (
+    room_id TEXT NOT NULL,
+    user_id TEXT NOT NULL  -- Admin who blocked the room
+);
+
+CREATE UNIQUE INDEX blocked_rooms_idx ON blocked_rooms(room_id);
diff --git a/synapse/storage/schema/delta/43/quarantine_media.sql b/synapse/storage/schema/delta/43/quarantine_media.sql
new file mode 100644
index 0000000000..630907ec4f
--- /dev/null
+++ b/synapse/storage/schema/delta/43/quarantine_media.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN quarantined_by TEXT;
+ALTER TABLE remote_media_cache ADD COLUMN quarantined_by TEXT;
diff --git a/synapse/storage/schema/delta/43/url_cache.sql b/synapse/storage/schema/delta/43/url_cache.sql
new file mode 100644
index 0000000000..45ebe020da
--- /dev/null
+++ b/synapse/storage/schema/delta/43/url_cache.sql
@@ -0,0 +1,16 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN url_cache TEXT;
diff --git a/synapse/storage/schema/delta/43/user_share.sql b/synapse/storage/schema/delta/43/user_share.sql
new file mode 100644
index 0000000000..4501d90cbb
--- /dev/null
+++ b/synapse/storage/schema/delta/43/user_share.sql
@@ -0,0 +1,33 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Table keeping track of who shares a room with who. We only keep track
+-- of this for local users, so `user_id` is local users only (but we do keep track
+-- of which remote users share a room)
+CREATE TABLE users_who_share_rooms (
+    user_id TEXT NOT NULL,
+    other_user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    share_private BOOLEAN NOT NULL  -- is the shared room private? i.e. they share a private room
+);
+
+
+CREATE UNIQUE INDEX users_who_share_rooms_u_idx ON users_who_share_rooms(user_id, other_user_id);
+CREATE INDEX users_who_share_rooms_r_idx ON users_who_share_rooms(room_id);
+CREATE INDEX users_who_share_rooms_o_idx ON users_who_share_rooms(other_user_id);
+
+
+-- Make sure that we popualte the table initially
+UPDATE user_directory_stream_pos SET stream_id = NULL;
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a7c3d401d4..5673e4aa96 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -20,6 +20,7 @@ from synapse.util.stringutils import to_ascii
 from synapse.storage.engines import PostgresEngine
 
 from twisted.internet import defer
+from collections import namedtuple
 
 import logging
 
@@ -29,6 +30,16 @@ logger = logging.getLogger(__name__)
 MAX_STATE_DELTA_HOPS = 100
 
 
+class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delta_ids"))):
+    """Return type of get_state_group_delta that implements __len__, which lets
+    us use the itrable flag when caching
+    """
+    __slots__ = []
+
+    def __len__(self):
+        return len(self.delta_ids) if self.delta_ids else 0
+
+
 class StateStore(SQLBaseStore):
     """ Keeps track of the state at a given event.
 
@@ -98,6 +109,46 @@ class StateStore(SQLBaseStore):
             _get_current_state_ids_txn,
         )
 
+    @cached(max_entries=10000, iterable=True)
+    def get_state_group_delta(self, state_group):
+        """Given a state group try to return a previous group and a delta between
+        the old and the new.
+
+        Returns:
+            (prev_group, delta_ids), where both may be None.
+        """
+        def _get_state_group_delta_txn(txn):
+            prev_group = self._simple_select_one_onecol_txn(
+                txn,
+                table="state_group_edges",
+                keyvalues={
+                    "state_group": state_group,
+                },
+                retcol="prev_state_group",
+                allow_none=True,
+            )
+
+            if not prev_group:
+                return _GetStateGroupDelta(None, None)
+
+            delta_ids = self._simple_select_list_txn(
+                txn,
+                table="state_groups_state",
+                keyvalues={
+                    "state_group": state_group,
+                },
+                retcols=("type", "state_key", "event_id",)
+            )
+
+            return _GetStateGroupDelta(prev_group, {
+                (row["type"], row["state_key"]): row["event_id"]
+                for row in delta_ids
+            })
+        return self.runInteraction(
+            "get_state_group_delta",
+            _get_state_group_delta_txn,
+        )
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
@@ -184,6 +235,19 @@ class StateStore(SQLBaseStore):
             # We persist as a delta if we can, while also ensuring the chain
             # of deltas isn't tooo long, as otherwise read performance degrades.
             if context.prev_group:
+                is_in_db = self._simple_select_one_onecol_txn(
+                    txn,
+                    table="state_groups",
+                    keyvalues={"id": context.prev_group},
+                    retcol="id",
+                    allow_none=True,
+                )
+                if not is_in_db:
+                    raise Exception(
+                        "Trying to persist state with unpersisted prev_group: %r"
+                        % (context.prev_group,)
+                    )
+
                 potential_hops = self._count_state_group_hops_txn(
                     txn, context.prev_group
                 )
@@ -251,6 +315,12 @@ class StateStore(SQLBaseStore):
             ],
         )
 
+        for event_id, state_group_id in state_groups.iteritems():
+            txn.call_after(
+                self._get_state_group_for_event.prefill,
+                (event_id,), state_group_id
+            )
+
     def _count_state_group_hops_txn(self, txn, state_group):
         """Given a state group, count how many hops there are in the tree.
 
@@ -520,8 +590,8 @@ class StateStore(SQLBaseStore):
         state_map = yield self.get_state_ids_for_events([event_id], types)
         defer.returnValue(state_map[event_id])
 
-    @cached(num_args=2, max_entries=50000)
-    def _get_state_group_for_event(self, room_id, event_id):
+    @cached(max_entries=50000)
+    def _get_state_group_for_event(self, event_id):
         return self._simple_select_one_onecol(
             table="event_to_state_groups",
             keyvalues={
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 6a4bf63f0d..2a4db3f03c 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -16,16 +16,19 @@
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
+
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import get_domain_from_id, get_localpart_from_id
 
 import re
+import logging
 
+logger = logging.getLogger(__name__)
 
-class UserDirectoryStore(SQLBaseStore):
 
+class UserDirectoryStore(SQLBaseStore):
     @cachedInlineCallbacks(cache_context=True)
     def is_room_world_readable_or_publicly_joinable(self, room_id, cache_context):
         """Check if the room is either world_readable or publically joinable
@@ -149,46 +152,64 @@ class UserDirectoryStore(SQLBaseStore):
         )
         self.get_user_in_directory.invalidate((user_id,))
 
-    def update_profile_in_user_dir(self, user_id, display_name, avatar_url):
+    def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id):
         def _update_profile_in_user_dir_txn(txn):
-            self._simple_update_one_txn(
+            new_entry = self._simple_upsert_txn(
                 txn,
                 table="user_directory",
                 keyvalues={"user_id": user_id},
-                updatevalues={"display_name": display_name, "avatar_url": avatar_url},
+                insertion_values={"room_id": room_id},
+                values={"display_name": display_name, "avatar_url": avatar_url},
+                lock=False,  # We're only inserter
             )
 
             if isinstance(self.database_engine, PostgresEngine):
                 # We weight the loclpart most highly, then display name and finally
                 # server name
-                sql = """
-                    UPDATE user_directory_search
-                    SET vector = setweight(to_tsvector('english', ?), 'A')
-                        || setweight(to_tsvector('english', ?), 'D')
-                        || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
-                    WHERE user_id = ?
-                """
-                args = (
-                    get_localpart_from_id(user_id), get_domain_from_id(user_id),
-                    display_name,
-                    user_id,
-                )
+                if new_entry:
+                    sql = """
+                        INSERT INTO user_directory_search(user_id, vector)
+                        VALUES (?,
+                            setweight(to_tsvector('english', ?), 'A')
+                            || setweight(to_tsvector('english', ?), 'D')
+                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                        )
+                    """
+                    txn.execute(
+                        sql,
+                        (
+                            user_id, get_localpart_from_id(user_id),
+                            get_domain_from_id(user_id), display_name,
+                        )
+                    )
+                else:
+                    sql = """
+                        UPDATE user_directory_search
+                        SET vector = setweight(to_tsvector('english', ?), 'A')
+                            || setweight(to_tsvector('english', ?), 'D')
+                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                        WHERE user_id = ?
+                    """
+                    txn.execute(
+                        sql,
+                        (
+                            get_localpart_from_id(user_id), get_domain_from_id(user_id),
+                            display_name, user_id,
+                        )
+                    )
             elif isinstance(self.database_engine, Sqlite3Engine):
-                sql = """
-                    UPDATE user_directory_search
-                    set value = ?
-                    WHERE user_id = ?
-                """
-                args = (
-                    "%s %s" % (user_id, display_name,) if display_name else user_id,
-                    user_id,
+                value = "%s %s" % (user_id, display_name,) if display_name else user_id
+                self._simple_upsert_txn(
+                    txn,
+                    table="user_directory_search",
+                    keyvalues={"user_id": user_id},
+                    values={"value": value},
+                    lock=False,  # We're only inserter
                 )
             else:
                 # This should be unreachable.
                 raise Exception("Unrecognized database engine")
 
-            txn.execute(sql, args)
-
             txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
 
         return self.runInteraction(
@@ -252,27 +273,241 @@ class UserDirectoryStore(SQLBaseStore):
             desc="get_users_in_public_due_to_room",
         )
 
+    @defer.inlineCallbacks
     def get_users_in_dir_due_to_room(self, room_id):
         """Get all user_ids that are in the room directory becuase they're
         in the given room_id
         """
-        return self._simple_select_onecol(
+        user_ids_dir = yield self._simple_select_onecol(
             table="user_directory",
             keyvalues={"room_id": room_id},
             retcol="user_id",
             desc="get_users_in_dir_due_to_room",
         )
 
+        user_ids_pub = yield self._simple_select_onecol(
+            table="users_in_pubic_room",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_dir_due_to_room",
+        )
+
+        user_ids_share = yield self._simple_select_onecol(
+            table="users_who_share_rooms",
+            keyvalues={"room_id": room_id},
+            retcol="user_id",
+            desc="get_users_in_dir_due_to_room",
+        )
+
+        user_ids = set(user_ids_dir)
+        user_ids.update(user_ids_pub)
+        user_ids.update(user_ids_share)
+
+        defer.returnValue(user_ids)
+
+    @defer.inlineCallbacks
     def get_all_rooms(self):
-        """Get all room_ids we've ever known about
+        """Get all room_ids we've ever known about, in ascending order of "size"
         """
-        return self._simple_select_onecol(
-            table="current_state_events",
-            keyvalues={},
-            retcol="DISTINCT room_id",
-            desc="get_all_rooms",
+        sql = """
+            SELECT room_id FROM current_state_events
+            GROUP BY room_id
+            ORDER BY count(*) ASC
+        """
+        rows = yield self._execute("get_all_rooms", None, sql)
+        defer.returnValue([room_id for room_id, in rows])
+
+    def add_users_who_share_room(self, room_id, share_private, user_id_tuples):
+        """Insert entries into the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _add_users_who_share_room_txn(txn):
+            self._simple_insert_many_txn(
+                txn,
+                table="users_who_share_rooms",
+                values=[
+                    {
+                        "user_id": user_id,
+                        "other_user_id": other_user_id,
+                        "room_id": room_id,
+                        "share_private": share_private,
+                    }
+                    for user_id, other_user_id in user_id_tuples
+                ],
+            )
+            for user_id, other_user_id in user_id_tuples:
+                txn.call_after(
+                    self.get_users_who_share_room_from_dir.invalidate,
+                    (user_id,),
+                )
+                txn.call_after(
+                    self.get_if_users_share_a_room.invalidate,
+                    (user_id, other_user_id),
+                )
+        return self.runInteraction(
+            "add_users_who_share_room", _add_users_who_share_room_txn
+        )
+
+    def update_users_who_share_room(self, room_id, share_private, user_id_sets):
+        """Updates entries in the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _update_users_who_share_room_txn(txn):
+            sql = """
+                UPDATE users_who_share_rooms
+                SET room_id = ?, share_private = ?
+                WHERE user_id = ? AND other_user_id = ?
+            """
+            txn.executemany(
+                sql,
+                (
+                    (room_id, share_private, uid, oid)
+                    for uid, oid in user_id_sets
+                )
+            )
+            for user_id, other_user_id in user_id_sets:
+                txn.call_after(
+                    self.get_users_who_share_room_from_dir.invalidate,
+                    (user_id,),
+                )
+                txn.call_after(
+                    self.get_if_users_share_a_room.invalidate,
+                    (user_id, other_user_id),
+                )
+        return self.runInteraction(
+            "update_users_who_share_room", _update_users_who_share_room_txn
+        )
+
+    def remove_user_who_share_room(self, user_id, other_user_id):
+        """Deletes entries in the users_who_share_rooms table. The first
+        user should be a local user.
+
+        Args:
+            room_id (str)
+            share_private (bool): Is the room private
+            user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs.
+        """
+        def _remove_user_who_share_room_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="users_who_share_rooms",
+                keyvalues={
+                    "user_id": user_id,
+                    "other_user_id": other_user_id,
+                },
+            )
+            txn.call_after(
+                self.get_users_who_share_room_from_dir.invalidate,
+                (user_id,),
+            )
+            txn.call_after(
+                self.get_if_users_share_a_room.invalidate,
+                (user_id, other_user_id),
+            )
+
+        return self.runInteraction(
+            "remove_user_who_share_room", _remove_user_who_share_room_txn
+        )
+
+    @cached(max_entries=500000)
+    def get_if_users_share_a_room(self, user_id, other_user_id):
+        """Gets if users share a room.
+
+        Args:
+            user_id (str): Must be a local user_id
+            other_user_id (str)
+
+        Returns:
+            bool|None: None if they don't share a room, otherwise whether they
+            share a private room or not.
+        """
+        return self._simple_select_one_onecol(
+            table="users_who_share_rooms",
+            keyvalues={
+                "user_id": user_id,
+                "other_user_id": other_user_id,
+            },
+            retcol="share_private",
+            allow_none=True,
+            desc="get_if_users_share_a_room",
+        )
+
+    @cachedInlineCallbacks(max_entries=500000, iterable=True)
+    def get_users_who_share_room_from_dir(self, user_id):
+        """Returns the set of users who share a room with `user_id`
+
+        Args:
+            user_id(str): Must be a local user
+
+        Returns:
+            dict: user_id -> share_private mapping
+        """
+        rows = yield self._simple_select_list(
+            table="users_who_share_rooms",
+            keyvalues={
+                "user_id": user_id,
+            },
+            retcols=("other_user_id", "share_private",),
+            desc="get_users_who_share_room_with_user",
+        )
+
+        defer.returnValue({
+            row["other_user_id"]: row["share_private"]
+            for row in rows
+        })
+
+    def get_users_in_share_dir_with_room_id(self, user_id, room_id):
+        """Get all user tuples that are in the users_who_share_rooms due to the
+        given room_id.
+
+        Returns:
+            [(user_id, other_user_id)]: where one of the two will match the given
+            user_id.
+        """
+        sql = """
+            SELECT user_id, other_user_id FROM users_who_share_rooms
+            WHERE room_id = ? AND (user_id = ? OR other_user_id = ?)
+        """
+        return self._execute(
+            "get_users_in_share_dir_with_room_id", None, sql, room_id, user_id, user_id
         )
 
+    @defer.inlineCallbacks
+    def get_rooms_in_common_for_users(self, user_id, other_user_id):
+        """Given two user_ids find out the list of rooms they share.
+        """
+        sql = """
+            SELECT room_id FROM (
+                SELECT c.room_id FROM current_state_events AS c
+                INNER JOIN room_memberships USING (event_id)
+                WHERE type = 'm.room.member'
+                    AND membership = 'join'
+                    AND state_key = ?
+            ) AS f1 INNER JOIN (
+                SELECT c.room_id FROM current_state_events AS c
+                INNER JOIN room_memberships USING (event_id)
+                WHERE type = 'm.room.member'
+                    AND membership = 'join'
+                    AND state_key = ?
+            ) f2 USING (room_id)
+        """
+
+        rows = yield self._execute(
+            "get_rooms_in_common_for_users", None, sql, user_id, other_user_id
+        )
+
+        defer.returnValue([room_id for room_id, in rows])
+
     def delete_all_from_user_dir(self):
         """Delete the entire user directory
         """
@@ -280,8 +515,11 @@ class UserDirectoryStore(SQLBaseStore):
             txn.execute("DELETE FROM user_directory")
             txn.execute("DELETE FROM user_directory_search")
             txn.execute("DELETE FROM users_in_pubic_room")
+            txn.execute("DELETE FROM users_who_share_rooms")
             txn.call_after(self.get_user_in_directory.invalidate_all)
             txn.call_after(self.get_user_in_public_room.invalidate_all)
+            txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all)
+            txn.call_after(self.get_if_users_share_a_room.invalidate_all)
         return self.runInteraction(
             "delete_all_from_user_dir", _delete_all_from_user_dir_txn
         )
@@ -374,7 +612,7 @@ class UserDirectoryStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def search_user_dir(self, search_term, limit):
+    def search_user_dir(self, user_id, search_term, limit):
         """Searches for users in directory
 
         Returns:
@@ -391,38 +629,72 @@ class UserDirectoryStore(SQLBaseStore):
                     ]
                 }
         """
-
-        search_query = _parse_query(self.database_engine, search_term)
-
         if isinstance(self.database_engine, PostgresEngine):
+            full_query, exact_query, prefix_query = _parse_query_postgres(search_term)
+
             # We order by rank and then if they have profile info
+            # The ranking algorithm is hand tweaked for "best" results. Broadly
+            # the idea is we give a higher weight to exact matches.
+            # The array of numbers are the weights for the various part of the
+            # search: (domain, _, display name, localpart)
             sql = """
-                SELECT user_id, display_name, avatar_url
+                SELECT d.user_id, display_name, avatar_url
                 FROM user_directory_search
-                INNER JOIN user_directory USING (user_id)
-                INNER JOIN users_in_pubic_room USING (user_id)
-                WHERE vector @@ to_tsquery('english', ?)
+                INNER JOIN user_directory AS d USING (user_id)
+                LEFT JOIN users_in_pubic_room AS p USING (user_id)
+                LEFT JOIN (
+                    SELECT other_user_id AS user_id FROM users_who_share_rooms
+                    WHERE user_id = ? AND share_private
+                ) AS s USING (user_id)
+                WHERE
+                    (s.user_id IS NOT NULL OR p.user_id IS NOT NULL)
+                    AND vector @@ to_tsquery('english', ?)
                 ORDER BY
-                    ts_rank_cd(vector, to_tsquery('english', ?), 1) DESC,
+                    (CASE WHEN s.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
+                    * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
+                    * (CASE WHEN avatar_url IS NOT NULL THEN 1.2 ELSE 1.0 END)
+                    * (
+                        3 * ts_rank_cd(
+                            '{0.1, 0.1, 0.9, 1.0}',
+                            vector,
+                            to_tsquery('english', ?),
+                            8
+                        )
+                        + ts_rank_cd(
+                            '{0.1, 0.1, 0.9, 1.0}',
+                            vector,
+                            to_tsquery('english', ?),
+                            8
+                        )
+                    )
+                    DESC,
                     display_name IS NULL,
                     avatar_url IS NULL
                 LIMIT ?
             """
-            args = (search_query, search_query, limit + 1,)
+            args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
         elif isinstance(self.database_engine, Sqlite3Engine):
+            search_query = _parse_query_sqlite(search_term)
+
             sql = """
-                SELECT user_id, display_name, avatar_url
+                SELECT d.user_id, display_name, avatar_url
                 FROM user_directory_search
-                INNER JOIN user_directory USING (user_id)
-                INNER JOIN users_in_pubic_room USING (user_id)
-                WHERE value MATCH ?
+                INNER JOIN user_directory AS d USING (user_id)
+                LEFT JOIN users_in_pubic_room AS p USING (user_id)
+                LEFT JOIN (
+                    SELECT other_user_id AS user_id FROM users_who_share_rooms
+                    WHERE user_id = ? AND share_private
+                ) AS s USING (user_id)
+                WHERE
+                    (s.user_id IS NOT NULL OR p.user_id IS NOT NULL)
+                    AND value MATCH ?
                 ORDER BY
                     rank(matchinfo(user_directory_search)) DESC,
                     display_name IS NULL,
                     avatar_url IS NULL
                 LIMIT ?
             """
-            args = (search_query, limit + 1)
+            args = (user_id, search_query, limit + 1)
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
@@ -439,7 +711,7 @@ class UserDirectoryStore(SQLBaseStore):
         })
 
 
-def _parse_query(database_engine, search_term):
+def _parse_query_sqlite(search_term):
     """Takes a plain unicode string from the user and converts it into a form
     that can be passed to database.
     We use this so that we can add prefix matching, which isn't something
@@ -451,11 +723,21 @@ def _parse_query(database_engine, search_term):
 
     # Pull out the individual words, discarding any non-word characters.
     results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+    return " & ".join("(%s* | %s)" % (result, result,) for result in results)
+
+
+def _parse_query_postgres(search_term):
+    """Takes a plain unicode string from the user and converts it into a form
+    that can be passed to database.
+    We use this so that we can add prefix matching, which isn't something
+    that is supported by default.
+    """
+
+    # Pull out the individual words, discarding any non-word characters.
+    results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+
+    both = " & ".join("(%s:* | %s)" % (result, result,) for result in results)
+    exact = " & ".join("%s" % (result,) for result in results)
+    prefix = " & ".join("%s:*" % (result,) for result in results)
 
-    if isinstance(database_engine, PostgresEngine):
-        return " & ".join("(%s:* | %s)" % (result, result,) for result in results)
-    elif isinstance(database_engine, Sqlite3Engine):
-        return " & ".join("(%s* | %s)" % (result, result,) for result in results)
-    else:
-        # This should be unreachable.
-        raise Exception("Unrecognized database engine")
+    return both, exact, prefix
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4a83c46d98..4adae96681 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -16,7 +16,7 @@
 import synapse.metrics
 import os
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
 metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cbdff86596..af65bfe7b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -16,6 +16,7 @@ import logging
 
 from synapse.util.async import ObservableDeferred
 from synapse.util import unwrapFirstError, logcontext
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 from synapse.util.stringutils import to_ascii
@@ -25,7 +26,6 @@ from . import register_cache
 from twisted.internet import defer
 from collections import namedtuple
 
-import os
 import functools
 import inspect
 import threading
@@ -37,9 +37,6 @@ logger = logging.getLogger(__name__)
 _CacheSentinel = object()
 
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
 class CacheEntry(object):
     __slots__ = [
         "deferred", "sequence", "callbacks", "invalidated"
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index cbdde34a57..6ad53a6390 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -94,6 +94,9 @@ class ExpiringCache(object):
 
         return entry.value
 
+    def __contains__(self, key):
+        return key in self._cache
+
     def get(self, key, default=None):
         try:
             return self[key]
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index c498aee46c..941d873ab8 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,20 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import register_cache
+from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
 
 
 from blist import sorteddict
 import logging
-import os
 
 
 logger = logging.getLogger(__name__)
 
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
-
-
 class StreamChangeCache(object):
     """Keeps track of the stream positions of the latest change in a set of entities.
 
@@ -96,10 +92,10 @@ class StreamChangeCache(object):
 
         if stream_pos >= self._earliest_known_stream_pos:
             self.metrics.inc_hits()
-            if stream_pos >= max(self._cache):
-                return False
-            else:
-                return True
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return i < len(keys)
         else:
             self.metrics.inc_misses()
             return True
diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py
index 281eb16254..3cfa21c9f8 100644
--- a/tests/storage/test__base.py
+++ b/tests/storage/test__base.py
@@ -241,7 +241,7 @@ class CacheDecoratorTestCase(unittest.TestCase):
         callcount2 = [0]
 
         class A(object):
-            @cached(max_entries=20)  # HACK: This makes it 2 due to cache factor
+            @cached(max_entries=4)  # HACK: This makes it 2 due to cache factor
             def func(self, key):
                 callcount[0] += 1
                 return key
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index 1f0c0e7c37..03df697575 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -43,10 +43,7 @@ class ClientIpStoreTestCase(tests.unittest.TestCase):
             "access_token", "ip", "user_agent", "device_id",
         )
 
-        # deliberately use an iterable here to make sure that the lookup
-        # method doesn't iterate it twice
-        device_list = iter(((user_id, "device_id"),))
-        result = yield self.store.get_last_client_ip_by_device(device_list)
+        result = yield self.store.get_last_client_ip_by_device(user_id, "device_id")
 
         r = result[(user_id, "device_id")]
         self.assertDictContainsSubset(
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
deleted file mode 100644
index 14443b53bc..0000000000
--- a/tests/storage/test_events.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from mock import Mock
-from synapse.types import RoomID, UserID
-
-from tests import unittest
-from twisted.internet import defer
-from tests.storage.event_injector import EventInjector
-
-from tests.utils import setup_test_homeserver
-
-
-class EventsStoreTestCase(unittest.TestCase):
-
-    @defer.inlineCallbacks
-    def setUp(self):
-        self.hs = yield setup_test_homeserver(
-            resource_for_federation=Mock(),
-            http_client=None,
-        )
-        self.store = self.hs.get_datastore()
-        self.db_pool = self.hs.get_db_pool()
-        self.message_handler = self.hs.get_handlers().message_handler
-        self.event_injector = EventInjector(self.hs)
-
-    @defer.inlineCallbacks
-    def test_count_daily_messages(self):
-        yield self.db_pool.runQuery("DELETE FROM stats_reporting")
-
-        self.hs.clock.now = 100
-
-        # Never reported before, and nothing which could be reported
-        count = yield self.store.count_daily_messages()
-        self.assertIsNone(count)
-        count = yield self.db_pool.runQuery("SELECT COUNT(*) FROM stats_reporting")
-        self.assertEqual([(0,)], count)
-
-        # Create something to report
-        room = RoomID.from_string("!abc123:test")
-        user = UserID.from_string("@raccoonlover:test")
-        yield self.event_injector.create_room(room, user)
-
-        self.base_event = yield self._get_last_stream_token()
-
-        yield self.event_injector.inject_message(room, user, "Raccoons are really cute")
-
-        # Never reported before, something could be reported, but isn't because
-        # it isn't old enough.
-        count = yield self.store.count_daily_messages()
-        self.assertIsNone(count)
-        yield self._assert_stats_reporting(1, self.hs.clock.now)
-
-        # Already reported yesterday, two new events from today.
-        yield self.event_injector.inject_message(room, user, "Yeah they are!")
-        yield self.event_injector.inject_message(room, user, "Incredibly!")
-        self.hs.clock.now += 60 * 60 * 24
-        count = yield self.store.count_daily_messages()
-        self.assertEqual(2, count)  # 2 since yesterday
-        yield self._assert_stats_reporting(3, self.hs.clock.now)  # 3 ever
-
-        # Last reported too recently.
-        yield self.event_injector.inject_message(room, user, "Who could disagree?")
-        self.hs.clock.now += 60 * 60 * 22
-        count = yield self.store.count_daily_messages()
-        self.assertIsNone(count)
-        yield self._assert_stats_reporting(4, self.hs.clock.now)
-
-        # Last reported too long ago
-        yield self.event_injector.inject_message(room, user, "No one.")
-        self.hs.clock.now += 60 * 60 * 26
-        count = yield self.store.count_daily_messages()
-        self.assertIsNone(count)
-        yield self._assert_stats_reporting(5, self.hs.clock.now)
-
-        # And now let's actually report something
-        yield self.event_injector.inject_message(room, user, "Indeed.")
-        yield self.event_injector.inject_message(room, user, "Indeed.")
-        yield self.event_injector.inject_message(room, user, "Indeed.")
-        # A little over 24 hours is fine :)
-        self.hs.clock.now += (60 * 60 * 24) + 50
-        count = yield self.store.count_daily_messages()
-        self.assertEqual(3, count)
-        yield self._assert_stats_reporting(8, self.hs.clock.now)
-
-    @defer.inlineCallbacks
-    def _get_last_stream_token(self):
-        rows = yield self.db_pool.runQuery(
-            "SELECT stream_ordering"
-            " FROM events"
-            " ORDER BY stream_ordering DESC"
-            " LIMIT 1"
-        )
-        if not rows:
-            defer.returnValue(0)
-        else:
-            defer.returnValue(rows[0][0])
-
-    @defer.inlineCallbacks
-    def _assert_stats_reporting(self, messages, time):
-        rows = yield self.db_pool.runQuery(
-            "SELECT reported_stream_token, reported_time FROM stats_reporting"
-        )
-        self.assertEqual([(self.base_event + messages, time,)], rows)
diff --git a/tests/test_state.py b/tests/test_state.py
index 6454f994e3..feb84f3d48 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -143,6 +143,7 @@ class StateTestCase(unittest.TestCase):
                 "add_event_hashes",
                 "get_events",
                 "get_next_state_group",
+                "get_state_group_delta",
             ]
         )
         hs = Mock(spec_set=[
@@ -154,6 +155,7 @@ class StateTestCase(unittest.TestCase):
         hs.get_auth.return_value = Auth(hs)
 
         self.store.get_next_state_group.side_effect = Mock
+        self.store.get_state_group_delta.return_value = (None, None)
 
         self.state = StateHandler(hs)
         self.event_id = 0