summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-09-03 21:08:35 +1000
committerGitHub <noreply@github.com>2018-09-03 21:08:35 +1000
commit4fc4b881c58fd638db5f4dac0863721111b67af0 (patch)
treecc1604f5e3b4e0a263e0e11a55b62ef4006a64a1 /synapse/handlers
parentThe project `matrix-synapse-auto-deploy` does not seem to be maintained anymore. (diff)
parentMerge pull request #3777 from matrix-org/neilj/fix_register_user_registration (diff)
downloadsynapse-4fc4b881c58fd638db5f4dac0863721111b67af0.tar.xz
Merge branch 'develop' into develop
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py10
-rw-r--r--synapse/handlers/_base.py6
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py28
-rw-r--r--synapse/handlers/auth.py129
-rw-r--r--synapse/handlers/deactivate_account.py55
-rw-r--r--synapse/handlers/device.py20
-rw-r--r--synapse/handlers/devicemessage.py3
-rw-r--r--synapse/handlers/directory.py9
-rw-r--r--synapse/handlers/e2e_keys.py17
-rw-r--r--synapse/handlers/events.py40
-rw-r--r--synapse/handlers/federation.py665
-rw-r--r--synapse/handlers/groups_local.py7
-rw-r--r--synapse/handlers/identity.py102
-rw-r--r--synapse/handlers/initial_sync.py53
-rw-r--r--synapse/handlers/message.py431
-rw-r--r--synapse/handlers/pagination.py298
-rw-r--r--synapse/handlers/presence.py54
-rw-r--r--synapse/handlers/profile.py128
-rw-r--r--synapse/handlers/read_marker.py7
-rw-r--r--synapse/handlers/receipts.py26
-rw-r--r--synapse/handlers/register.py26
-rw-r--r--synapse/handlers/room.py93
-rw-r--r--synapse/handlers/room_list.py28
-rw-r--r--synapse/handlers/room_member.py30
-rw-r--r--synapse/handlers/room_member_worker.py40
-rw-r--r--synapse/handlers/search.py33
-rw-r--r--synapse/handlers/set_password.py1
-rw-r--r--synapse/handlers/sync.py367
-rw-r--r--synapse/handlers/typing.py11
-rw-r--r--synapse/handlers/user_directory.py20
31 files changed, 1731 insertions, 1010 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index d358842b3e..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,13 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .register import RegistrationHandler
-from .room import RoomContextHandler
-from .message import MessageHandler
-from .federation import FederationHandler
-from .directory import DirectoryHandler
 from .admin import AdminHandler
+from .directory import DirectoryHandler
+from .federation import FederationHandler
 from .identity import IdentityHandler
+from .register import RegistrationHandler
 from .search import SearchHandler
 
 
@@ -44,10 +42,8 @@ class Handlers(object):
 
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
-        self.message_handler = MessageHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
-        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 2d1db0c245..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -18,11 +18,10 @@ import logging
 from twisted.internet import defer
 
 import synapse.types
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import LimitExceededError
 from synapse.types import UserID
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -113,8 +112,9 @@ class BaseHandler(object):
             guest_access = event.content.get("guest_access", "forbidden")
             if guest_access != "can_join":
                 if context:
+                    current_state_ids = yield context.get_current_state_ids(self.store)
                     current_state = yield self.store.get_events(
-                        list(context.current_state_ids.values())
+                        list(current_state_ids.values())
                     )
                 else:
                     current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index f36b358b45..5d629126fc 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
 from ._base import BaseHandler
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1c29c43a83..f0f89af7dc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -13,19 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
 from six import itervalues
 
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
 import synapse
 from synapse.api.constants import EventTypes
-from synapse.util.metrics import Measure
-from synapse.util.logcontext import (
-    make_deferred_yieldable, run_in_background,
+from synapse.metrics import (
+    event_processing_loop_counter,
+    event_processing_loop_room_count,
 )
-from prometheus_client import Counter
-
-import logging
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -107,7 +111,9 @@ class ApplicationServicesHandler(object):
                             yield self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
-                            self.scheduler.start().addErrback(log_failure)
+                            def start_scheduler():
+                                return self.scheduler.start().addErrback(log_failure)
+                            run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
                         # Fork off pushes to these services
@@ -134,6 +140,12 @@ class ApplicationServicesHandler(object):
 
                     events_processed_counter.inc(len(events))
 
+                    event_processing_loop_room_count.labels(
+                        "appservice_sender"
+                    ).inc(len(events_by_room))
+
+                    event_processing_loop_counter.labels("appservice_sender").inc()
+
                     synapse.metrics.event_processing_lag.labels(
                         "appservice_sender").set(now - ts)
                     synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,29 +13,34 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import logging
+import unicodedata
+
+import attr
+import bcrypt
+import pymacaroons
+from canonicaljson import json
+
 from twisted.internet import defer, threads
+from twisted.web.client import PartialDownloadError
 
-from ._base import BaseHandler
+import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
-    AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError,
+    AuthError,
+    Codes,
+    InteractiveAuthIncompleteError,
+    LoginError,
+    StoreError,
     SynapseError,
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable
 
-from twisted.web.client import PartialDownloadError
-
-import logging
-import bcrypt
-import pymacaroons
-import simplejson
-
-import synapse.util.stringutils as stringutils
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -402,7 +407,7 @@ class AuthHandler(BaseHandler):
         except PartialDownloadError as pde:
             # Twisted is silly
             data = pde.response
-            resp_body = simplejson.loads(data)
+            resp_body = json.loads(data)
 
         if 'success' in resp_body:
             # Note that we do NOT check the hostname here: we explicitly
@@ -423,15 +428,11 @@ class AuthHandler(BaseHandler):
     def _check_msisdn(self, authdict, _):
         return self._check_threepid('msisdn', authdict)
 
-    @defer.inlineCallbacks
     def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
+        return defer.succeed(True)
 
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
-        yield run_on_reactor()
-
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -519,6 +520,7 @@ class AuthHandler(BaseHandler):
         """
         logger.info("Logging in user %s on device %s", user_id, device_id)
         access_token = yield self.issue_access_token(user_id, device_id)
+        yield self.auth.check_auth_blocking(user_id)
 
         # the device *should* have been registered before we got here; however,
         # it's possible we raced against a DELETE operation. The thing we
@@ -626,6 +628,7 @@ class AuthHandler(BaseHandler):
         # special case to check for "password" for the check_password interface
         # for the auth providers
         password = login_submission.get("password")
+
         if login_type == LoginType.PASSWORD:
             if not self._password_enabled:
                 raise SynapseError(400, "Password login has been disabled.")
@@ -707,9 +710,10 @@ class AuthHandler(BaseHandler):
         multiple inexact matches.
 
         Args:
-            user_id (str): complete @user:id
+            user_id (unicode): complete @user:id
+            password (unicode): the provided password
         Returns:
-            (str) the canonical_user_id, or None if unknown user / bad password
+            (unicode) the canonical_user_id, or None if unknown user / bad password
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -728,15 +732,18 @@ class AuthHandler(BaseHandler):
                                                   device_id)
         defer.returnValue(access_token)
 
+    @defer.inlineCallbacks
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         auth_api = self.hs.get_auth()
+        user_id = None
         try:
             macaroon = pymacaroons.Macaroon.deserialize(login_token)
             user_id = auth_api.get_user_id_from_macaroon(macaroon)
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
-            return user_id
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        yield self.auth.check_auth_blocking(user_id)
+        defer.returnValue(user_id)
 
     @defer.inlineCallbacks
     def delete_access_token(self, access_token):
@@ -821,14 +828,37 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def delete_threepid(self, user_id, medium, address):
+        """Attempts to unbind the 3pid on the identity servers and deletes it
+        from the local database.
+
+        Args:
+            user_id (str)
+            medium (str)
+            address (str)
+
+        Returns:
+            Deferred[bool]: Returns True if successfully unbound the 3pid on
+            the identity server, False if identity server doesn't support the
+            unbind API.
+        """
+
         # 'Canonicalise' email addresses as per above
         if medium == 'email':
             address = address.lower()
 
-        ret = yield self.store.user_delete_threepid(
+        identity_handler = self.hs.get_handlers().identity_handler
+        result = yield identity_handler.try_unbind_threepid(
+            user_id,
+            {
+                'medium': medium,
+                'address': address,
+            },
+        )
+
+        yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
-        defer.returnValue(ret)
+        defer.returnValue(result)
 
     def _save_session(self, session):
         # TODO: Persistent storage
@@ -840,45 +870,62 @@ class AuthHandler(BaseHandler):
         """Computes a secure hash of password.
 
         Args:
-            password (str): Password to hash.
+            password (unicode): Password to hash.
 
         Returns:
-            Deferred(str): Hashed password.
+            Deferred(unicode): Hashed password.
         """
         def _do_hash():
-            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
-                                 bcrypt.gensalt(self.bcrypt_rounds))
-
-        return make_deferred_yieldable(threads.deferToThread(_do_hash))
+            # Normalise the Unicode in the password
+            pw = unicodedata.normalize("NFKC", password)
+
+            return bcrypt.hashpw(
+                pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
+                bcrypt.gensalt(self.bcrypt_rounds),
+            ).decode('ascii')
+
+        return make_deferred_yieldable(
+            threads.deferToThreadPool(
+                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+            ),
+        )
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
 
         Args:
-            password (str): Password to hash.
-            stored_hash (str): Expected hash value.
+            password (unicode): Password to hash.
+            stored_hash (unicode): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
 
         def _do_validate_hash():
+            # Normalise the Unicode in the password
+            pw = unicodedata.normalize("NFKC", password)
+
             return bcrypt.checkpw(
-                password.encode('utf8') + self.hs.config.password_pepper,
+                pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
                 stored_hash.encode('utf8')
             )
 
         if stored_hash:
-            return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+            return make_deferred_yieldable(
+                threads.deferToThreadPool(
+                    self.hs.get_reactor(),
+                    self.hs.get_reactor().getThreadPool(),
+                    _do_validate_hash,
+                ),
+            )
         else:
             return defer.succeed(False)
 
 
-class MacaroonGeneartor(object):
-    def __init__(self, hs):
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server_name
-        self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+    hs = attr.ib()
 
     def generate_access_token(self, user_id, extra_caveats=None):
         extra_caveats = extra_caveats or []
@@ -896,7 +943,7 @@ class MacaroonGeneartor(object):
     def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
-        now = self.clock.time_msec()
+        now = self.hs.get_clock().time_msec()
         expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
@@ -908,9 +955,9 @@ class MacaroonGeneartor(object):
 
     def _generate_base_macaroon(self, user_id):
         macaroon = pymacaroons.Macaroon(
-            location=self.server_name,
+            location=self.hs.config.server_name,
             identifier="key",
-            key=self.macaroon_secret_key)
+            key=self.hs.config.macaroon_secret_key)
         macaroon.add_first_party_caveat("gen = 1")
         macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
         return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..b078df4a76 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,13 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer, reactor
+import logging
 
-from ._base import BaseHandler
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
 from synapse.types import UserID, create_requester
 from synapse.util.logcontext import run_in_background
 
-import logging
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -30,6 +32,7 @@ class DeactivateAccountHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
+        self._identity_handler = hs.get_handlers().identity_handler
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -37,29 +40,58 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        reactor.callWhenRunning(self._start_user_parting)
+        hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
-    def deactivate_account(self, user_id):
+    def deactivate_account(self, user_id, erase_data):
         """Deactivate a user's account
 
         Args:
             user_id (str): ID of user to be deactivated
+            erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
-            Deferred
+            Deferred[bool]: True if identity server supports removing
+            threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
 
-        # first delete any devices belonging to the user, which will also
+        # delete threepids first. We remove these from the IS so if this fails,
+        # leave the user still active so they can try again.
+        # Ideally we would prevent password resets and then do this in the
+        # background thread.
+
+        # This will be set to false if the identity server doesn't support
+        # unbinding
+        identity_server_supports_unbinding = True
+
+        threepids = yield self.store.user_get_threepids(user_id)
+        for threepid in threepids:
+            try:
+                result = yield self._identity_handler.try_unbind_threepid(
+                    user_id,
+                    {
+                        'medium': threepid['medium'],
+                        'address': threepid['address'],
+                    },
+                )
+                identity_server_supports_unbinding &= result
+            except Exception:
+                # Do we want this to be a fatal error or should we carry on?
+                logger.exception("Failed to remove threepid from ID server")
+                raise SynapseError(400, "Failed to remove threepid from ID server")
+            yield self.store.user_delete_threepid(
+                user_id, threepid['medium'], threepid['address'],
+            )
+
+        # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
         # then delete any remaining access tokens which weren't associated with
         # a device.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
 
-        yield self.store.user_delete_threepids(user_id)
         yield self.store.user_set_password_hash(user_id, None)
 
         # Add the user to a table of users pending deactivation (ie.
@@ -69,10 +101,17 @@ class DeactivateAccountHandler(BaseHandler):
         # delete from user directory
         yield self.user_directory_handler.handle_user_deactivated(user_id)
 
+        # Mark the user as erased, if they asked for that
+        if erase_data:
+            logger.info("Marking %s as erased", user_id)
+            yield self.store.mark_user_erased(user_id)
+
         # Now start the process that goes through that list and
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        defer.returnValue(identity_server_supports_unbinding)
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 11c6fb3657..9e017116a9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,21 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
 from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.api.errors import FederationDeniedError
+from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.metrics import measure_func
-from synapse.types import get_domain_from_id, RoomStreamToken
-from twisted.internet import defer
-from ._base import BaseHandler
-
-import logging
+from synapse.util.retryutils import NotRetryingDestination
 
-from six import itervalues, iteritems
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -537,7 +539,7 @@ class DeviceListEduUpdater(object):
                 yield self.device_handler.notify_device_update(user_id, device_ids)
             else:
                 # Simply update the single device, since we know that is the only
-                # change (becuase of the single prev_id matching the current cache)
+                # change (because of the single prev_id matching the current cache)
                 for device_id, stream_id, prev_ids, content in pending_updates:
                     yield self.store.update_remote_device_list_cache_entry(
                         user_id, device_id, content, stream_id,
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f147a20b73..2e2e5261de 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,10 +18,9 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
-from synapse.types import get_domain_from_id, UserID
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c5b6e75e03..ef866da1b6 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -14,15 +14,16 @@
 # limitations under the License.
 
 
+import logging
+import string
+
 from twisted.internet import defer
-from ._base import BaseHandler
 
-from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError
 from synapse.api.constants import EventTypes
+from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
 from synapse.types import RoomAlias, UserID, get_domain_from_id
 
-import logging
-import string
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..5816bf8b4f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,17 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import simplejson as json
 import logging
 
-from canonicaljson import encode_canonical_json
-from twisted.internet import defer
 from six import iteritems
 
-from synapse.api.errors import (
-    SynapseError, CodeMessageException, FederationDeniedError,
-)
-from synapse.types import get_domain_from_id, UserID
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
-        # Firt get local devices.
+        # First get local devices.
         failures = {}
         results = {}
         if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
     # include ConnectionRefused and other errors
     #
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
-    # give a string for e.message, which simplejson then fails to serialize.
+    # give a string for e.message, which json then fails to serialize.
     return {
         "status": 503, "message": str(e.message),
     }
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 8bc642675f..f772e62c28 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -13,20 +13,21 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+import random
+
 from twisted.internet import defer
 
-from synapse.util.logutils import log_function
-from synapse.types import UserID
-from synapse.events.utils import serialize_event
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError
 from synapse.events import EventBase
+from synapse.events.utils import serialize_event
+from synapse.types import UserID
+from synapse.util.logutils import log_function
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
-import logging
-import random
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -130,11 +131,13 @@ class EventStreamHandler(BaseHandler):
 class EventHandler(BaseHandler):
 
     @defer.inlineCallbacks
-    def get_event(self, user, event_id):
+    def get_event(self, user, room_id, event_id):
         """Retrieve a single specified event.
 
         Args:
             user (synapse.types.UserID): The user requesting the event
+            room_id (str|None): The expected room id. We'll return None if the
+                event's room does not match.
             event_id (str): The event ID to obtain.
         Returns:
             dict: An event, or None if there is no event matching this ID.
@@ -143,13 +146,26 @@ class EventHandler(BaseHandler):
             AuthError if the user does not have the rights to inspect this
             event.
         """
-        event = yield self.store.get_event(event_id)
+        event = yield self.store.get_event(event_id, check_room_id=room_id)
 
         if not event:
             defer.returnValue(None)
             return
 
-        if hasattr(event, "room_id"):
-            yield self.auth.check_joined_room(event.room_id, user.to_string())
+        users = yield self.store.get_users_in_room(event.room_id)
+        is_peeking = user.to_string() not in users
+
+        filtered = yield filter_events_for_client(
+            self.store,
+            user.to_string(),
+            [event],
+            is_peeking=is_peeking
+        )
+
+        if not filtered:
+            raise AuthError(
+                403,
+                "You don't have permission to access that event."
+            )
 
         defer.returnValue(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..0ebf0fd188 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,51 @@ import itertools
 import logging
 import sys
 
+import six
+from six import iteritems, itervalues
+from six.moves import http_client, zip
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
-import six
-from six.moves import http_client
-from six import iteritems
-from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
-from ._base import BaseHandler
+from twisted.internet import defer
 
+from synapse.api.constants import (
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    Membership,
+    RejectedReason,
+)
 from synapse.api.errors import (
-    AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    AuthError,
+    CodeMessageException,
     FederationDeniedError,
+    FederationError,
+    StoreError,
+    SynapseError,
 )
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
-    compute_event_signature, add_hashes_and_signatures,
+    add_hashes_and_signatures,
+    compute_event_signature,
 )
+from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
+from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
 
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -72,7 +86,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_federation_client()
+        self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -82,6 +96,18 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -89,7 +115,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_receive_pdu(self, origin, pdu, get_missing=True):
+    def on_receive_pdu(
+            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+    ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
 
@@ -103,8 +131,10 @@ class FederationHandler(BaseHandler):
         """
 
         # We reprocess pdus when we have seen them only as outliers
-        existing = yield self.get_persisted_pdu(
-            origin, pdu.event_id, do_auth=False
+        existing = yield self.store.get_event(
+            pdu.event_id,
+            allow_none=True,
+            allow_rejected=True,
         )
 
         # FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +191,11 @@ class FederationHandler(BaseHandler):
                     "Ignoring PDU %s for room %s from %s as we've left the room!",
                     pdu.event_id, pdu.room_id, origin,
                 )
-                return
+                defer.returnValue(None)
 
         state = None
-
         auth_chain = []
 
-        fetch_state = False
-
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -223,26 +250,61 @@ class FederationHandler(BaseHandler):
                         list(prevs - seen)[:5],
                     )
 
-            if prevs - seen:
-                logger.info(
-                    "Still missing %d events for room %r: %r...",
-                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+            if sent_to_us_directly and prevs - seen:
+                # If they have sent it to us directly, and the server
+                # isn't telling us about the auth events that it's
+                # made a message referencing, we explode
+                raise FederationError(
+                    "ERROR",
+                    403,
+                    (
+                        "Your server isn't divulging details about prev_events "
+                        "referenced in this event."
+                    ),
+                    affected=pdu.event_id,
                 )
-                fetch_state = True
+            elif prevs - seen:
+                # Calculate the state of the previous events, and
+                # de-conflict them to find the current state.
+                state_groups = []
+                auth_chains = set()
+                try:
+                    # Get the state of the events we know about
+                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    state_groups.append(ours)
+
+                    # Ask the remote server for the states we don't
+                    # know about
+                    for p in prevs - seen:
+                        state, got_auth_chain = (
+                            yield self.federation_client.get_state_for_room(
+                                origin, pdu.room_id, p
+                            )
+                        )
+                        auth_chains.update(got_auth_chain)
+                        state_group = {(x.type, x.state_key): x.event_id for x in state}
+                        state_groups.append(state_group)
+
+                    # Resolve any conflicting state
+                    def fetch(ev_ids):
+                        return self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False
+                        )
 
-        if fetch_state:
-            # We need to get the state at this event, since we haven't
-            # processed all the prev events.
-            logger.debug(
-                "_handle_new_pdu getting state for %s",
-                pdu.room_id
-            )
-            try:
-                state, auth_chain = yield self.replication_layer.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
-            except Exception:
-                logger.exception("Failed to get state for event: %s", pdu.event_id)
+                    room_version = yield self.store.get_room_version(pdu.room_id)
+                    state_map = yield resolve_events_with_factory(
+                        room_version, state_groups, {pdu.event_id: pdu}, fetch
+                    )
+
+                    state = (yield self.store.get_events(state_map.values())).values()
+                    auth_chain = list(auth_chains)
+                except Exception:
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        "We can't get valid state history.",
+                        affected=pdu.event_id,
+                    )
 
         yield self._process_received_pdu(
             origin,
@@ -299,7 +361,7 @@ class FederationHandler(BaseHandler):
         #
         # see https://github.com/matrix-org/synapse/pull/1744
 
-        missing_events = yield self.replication_layer.get_missing_events(
+        missing_events = yield self.federation_client.get_missing_events(
             origin,
             pdu.room_id,
             earliest_events_ids=list(latest),
@@ -320,11 +382,17 @@ class FederationHandler(BaseHandler):
 
         for e in missing_events:
             logger.info("Handling found event %s", e.event_id)
-            yield self.on_receive_pdu(
-                origin,
-                e,
-                get_missing=False
-            )
+            try:
+                yield self.on_receive_pdu(
+                    origin,
+                    e,
+                    get_missing=False
+                )
+            except FederationError as e:
+                if e.code == 403:
+                    logger.warn("Event %s failed history check.")
+                else:
+                    raise
 
     @log_function
     @defer.inlineCallbacks
@@ -355,7 +423,7 @@ class FederationHandler(BaseHandler):
             )
 
             try:
-                event_stream_id, max_stream_id = yield self._persist_auth_tree(
+                yield self._persist_auth_tree(
                     origin, auth_chain, state, event
                 )
             except AuthError as e:
@@ -399,7 +467,7 @@ class FederationHandler(BaseHandler):
                 yield self._handle_new_events(origin, event_infos)
 
             try:
-                context, event_stream_id, max_stream_id = yield self._handle_new_event(
+                context = yield self._handle_new_event(
                     origin,
                     event,
                     state=state,
@@ -424,24 +492,16 @@ class FederationHandler(BaseHandler):
             except StoreError:
                 logger.exception("Failed to store room.")
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=extra_users
-        )
-
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 # Only fire user_joined_room if the user has acutally
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.prev_state_ids.get(
+
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+                prev_state_id = prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -453,84 +513,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield user_joined_room(self.distributor, user, event.room_id)
-
-    @measure_func("_filter_events_for_server")
-    @defer.inlineCallbacks
-    def _filter_events_for_server(self, server_name, room_id, events):
-        event_to_state_ids = yield self.store.get_state_ids_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, None),
-            )
-        )
-
-        # We only want to pull out member events that correspond to the
-        # server's domain.
-
-        def check_match(id):
-            try:
-                return server_name == get_domain_from_id(id)
-            except Exception:
-                return False
-
-        # Parses mapping `event_id -> (type, state_key) -> state event_id`
-        # to get all state ids that we're interested in.
-        event_map = yield self.store.get_events([
-            e_id
-            for key_to_eid in list(event_to_state_ids.values())
-            for key, e_id in key_to_eid.items()
-            if key[0] != EventTypes.Member or check_match(key[1])
-        ])
-
-        event_to_state = {
-            e_id: {
-                key: event_map[inner_e_id]
-                for key, inner_e_id in key_to_eid.iteritems()
-                if inner_e_id in event_map
-            }
-            for e_id, key_to_eid in event_to_state_ids.iteritems()
-        }
-
-        def redact_disallowed(event, state):
-            if not state:
-                return event
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-                if visibility in ["invited", "joined"]:
-                    # We now loop through all state events looking for
-                    # membership states for the requesting server to determine
-                    # if the server is either in the room or has been invited
-                    # into the room.
-                    for ev in state.itervalues():
-                        if ev.type != EventTypes.Member:
-                            continue
-                        try:
-                            domain = get_domain_from_id(ev.state_key)
-                        except Exception:
-                            continue
-
-                        if domain != server_name:
-                            continue
-
-                        memtype = ev.membership
-                        if memtype == Membership.JOIN:
-                            return event
-                        elif memtype == Membership.INVITE:
-                            if visibility == "invited":
-                                return event
-                    else:
-                        return prune_event(event)
-
-            return event
-
-        defer.returnValue([
-            redact_disallowed(e, event_to_state[e.event_id])
-            for e in events
-        ])
+                    yield self.user_joined_room(user, event.room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -551,7 +534,7 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
-        events = yield self.replication_layer.backfill(
+        events = yield self.federation_client.backfill(
             dest,
             room_id,
             limit=limit,
@@ -599,7 +582,7 @@ class FederationHandler(BaseHandler):
         state_events = {}
         events_to_state = {}
         for e_id in edges:
-            state, auth = yield self.replication_layer.get_state_for_room(
+            state, auth = yield self.federation_client.get_state_for_room(
                 destination=dest,
                 room_id=room_id,
                 event_id=e_id
@@ -641,7 +624,7 @@ class FederationHandler(BaseHandler):
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
                         logcontext.run_in_background(
-                            self.replication_layer.get_pdu,
+                            self.federation_client.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -763,7 +746,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in state.iteritems()
+                for (e_type, state_key), event in iteritems(state)
                 if e_type == EventTypes.Member
                 and event.membership == Membership.JOIN
             ]
@@ -780,7 +763,7 @@ class FederationHandler(BaseHandler):
                 except Exception:
                     pass
 
-            return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+            return sorted(joined_domains.items(), key=lambda d: d[1])
 
         curr_domains = get_domains_from_state(curr_state)
 
@@ -843,7 +826,7 @@ class FederationHandler(BaseHandler):
         tried_domains = set(likely_domains)
         tried_domains.add(self.server_name)
 
-        event_ids = list(extremities.iterkeys())
+        event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = logcontext.preserve_fn(
@@ -859,15 +842,15 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
-            [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
             get_prev_content=False
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in state_dict.iteritems()
+                for k, e_id in iteritems(state_dict)
                 if e_id in state_map
-            } for key, state_dict in states.iteritems()
+            } for key, state_dict in iteritems(states)
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -922,7 +905,7 @@ class FederationHandler(BaseHandler):
 
         Invites must be signed by the invitee's server before distribution.
         """
-        pdu = yield self.replication_layer.send_invite(
+        pdu = yield self.federation_client.send_invite(
             destination=target_host,
             room_id=event.room_id,
             event_id=event.event_id,
@@ -938,16 +921,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -972,6 +945,9 @@ class FederationHandler(BaseHandler):
             joinee,
             "join",
             content,
+            params={
+                "ver": KNOWN_ROOM_VERSIONS,
+            },
         )
 
         # This shouldn't happen, because the RoomMemberHandler has a
@@ -981,7 +957,7 @@ class FederationHandler(BaseHandler):
 
         self.room_queues[room_id] = []
 
-        yield self.store.clean_room_for_join(room_id)
+        yield self._clean_room_for_join(room_id)
 
         handled_events = set()
 
@@ -994,7 +970,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.replication_layer.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(target_hosts, event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -1020,15 +996,10 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            event_stream_id, max_stream_id = yield self._persist_auth_tree(
+            yield self._persist_auth_tree(
                 origin, auth_chain, state, event
             )
 
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=[joinee]
-            )
-
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -1123,7 +1094,7 @@ class FederationHandler(BaseHandler):
         # would introduce the danger of backwards-compatibility problems.
         event.internal_metadata.send_on_behalf_of = origin
 
-        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+        context = yield self._handle_new_event(
             origin, event
         )
 
@@ -1133,25 +1104,17 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
-        )
-
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
-                yield user_joined_room(self.distributor, user, event.room_id)
+                yield self.user_joined_room(user, event.room_id)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
 
-        state_ids = list(context.prev_state_ids.values())
+        state_ids = list(prev_state_ids.values())
         auth_chain = yield self.store.get_auth_chain(state_ids)
 
-        state = yield self.store.get_events(list(context.prev_state_ids.values()))
+        state = yield self.store.get_events(list(prev_state_ids.values()))
 
         defer.returnValue({
             "state": list(state.values()),
@@ -1213,17 +1176,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-        )
-
-        target_user = UserID.from_string(event.state_key)
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=[target_user],
-        )
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1248,35 +1201,26 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
+        yield self.federation_client.send_leave(
             target_hosts,
             event
         )
 
         context = yield self.state_handler.compute_event_context(event)
-
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-        )
-
-        target_user = UserID.from_string(event.state_key)
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=[target_user],
-        )
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
-                               content={},):
-        origin, pdu = yield self.replication_layer.make_membership_event(
+                               content={}, params=None):
+        origin, pdu = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
             membership,
             content,
+            params=params,
         )
 
         logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1316,7 +1260,7 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_leave_request(self, room_id, user_id):
         """ We've received a /make_leave/ request, so we create a partial
-        join event for the room and return that. We do *not* persist or
+        leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
         builder = self.event_builder_factory.new({
@@ -1355,7 +1299,7 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+        yield self._handle_new_event(
             origin, event
         )
 
@@ -1365,23 +1309,16 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
-        )
-
         defer.returnValue(None)
 
     @defer.inlineCallbacks
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
+
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id,
+        )
 
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
@@ -1393,8 +1330,7 @@ class FederationHandler(BaseHandler):
                 (e.type, e.state_key): e for e in state
             }
 
-            event = yield self.store.get_event(event_id)
-            if event and event.is_state():
+            if event.is_state():
                 # Get previous state
                 if "replaces_state" in event.unsigned:
                     prev_id = event.unsigned["replaces_state"]
@@ -1405,18 +1341,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1425,7 +1349,9 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id,
+        )
 
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
@@ -1435,8 +1361,7 @@ class FederationHandler(BaseHandler):
             _, state = state_groups.items().pop()
             results = state
 
-            event = yield self.store.get_event(event_id)
-            if event and event.is_state():
+            if event.is_state():
                 # Get previous state
                 if "replaces_state" in event.unsigned:
                     prev_id = event.unsigned["replaces_state"]
@@ -1462,17 +1387,26 @@ class FederationHandler(BaseHandler):
             limit
         )
 
-        events = yield self._filter_events_for_server(origin, room_id, events)
+        events = yield filter_events_for_server(self.store, origin, events)
 
         defer.returnValue(events)
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id, do_auth=True):
-        """ Get a PDU from the database with given origin and id.
+    def get_persisted_pdu(self, origin, event_id):
+        """Get an event from the database for the given server.
+
+        Args:
+            origin [str]: hostname of server which is requesting the event; we
+               will check that the server is allowed to see it.
+            event_id [str]: id of the event being requested
 
         Returns:
-            Deferred: Results in a `Pdu`.
+            Deferred[EventBase|None]: None if we know nothing about the event;
+                otherwise the (possibly-redacted) event.
+
+        Raises:
+            AuthError if the server is not currently in the room
         """
         event = yield self.store.get_event(
             event_id,
@@ -1481,32 +1415,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
-            if do_auth:
-                in_room = yield self.auth.check_host_in_room(
-                    event.room_id,
-                    origin
-                )
-                if not in_room:
-                    raise AuthError(403, "Host not in room.")
-
-                events = yield self._filter_events_for_server(
-                    origin, event.room_id, [event]
-                )
-
-                event = events[0]
+            in_room = yield self.auth.check_host_in_room(
+                event.room_id,
+                origin
+            )
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
+            events = yield filter_events_for_server(
+                self.store, origin, [event],
+            )
+            event = events[0]
             defer.returnValue(event)
         else:
             defer.returnValue(None)
@@ -1531,9 +1450,8 @@ class FederationHandler(BaseHandler):
                     event, context
                 )
 
-            event_stream_id, max_stream_id = yield self.store.persist_event(
-                event,
-                context=context,
+            yield self.persist_events_and_notify(
+                [(event, context)],
                 backfilled=backfilled,
             )
         except:  # noqa: E722, as we reraise the exception this is fine.
@@ -1546,15 +1464,7 @@ class FederationHandler(BaseHandler):
 
             six.reraise(tp, value, tb)
 
-        if not backfilled:
-            # this intentionally does not yield: we don't care about the result
-            # and don't need to wait for it.
-            logcontext.run_in_background(
-                self.pusher_pool.on_new_notifications,
-                event_stream_id, max_stream_id,
-            )
-
-        defer.returnValue((context, event_stream_id, max_stream_id))
+        defer.returnValue(context)
 
     @defer.inlineCallbacks
     def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1562,6 +1472,8 @@ class FederationHandler(BaseHandler):
         should not depend on one another, e.g. this should be used to persist
         a bunch of outliers, but not a chunk of individual events that depend
         on each other for state calculations.
+
+        Notifies about the events where appropriate.
         """
         contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
@@ -1576,10 +1488,10 @@ class FederationHandler(BaseHandler):
             ], consumeErrors=True,
         ))
 
-        yield self.store.persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
-                for ev_info, context in itertools.izip(event_infos, contexts)
+                for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
         )
@@ -1588,7 +1500,8 @@ class FederationHandler(BaseHandler):
     def _persist_auth_tree(self, origin, auth_events, state, event):
         """Checks the auth chain is valid (and passes auth checks) for the
         state and event. Then persists the auth chain and state atomically.
-        Persists the event seperately.
+        Persists the event separately. Notifies about the persisted events
+        where appropriate.
 
         Will attempt to fetch missing auth events.
 
@@ -1599,8 +1512,7 @@ class FederationHandler(BaseHandler):
             event (Event)
 
         Returns:
-            2-tuple of (event_stream_id, max_stream_id) from the persist_event
-            call for `event`
+            Deferred
         """
         events_to_context = {}
         for e in itertools.chain(auth_events, state):
@@ -1626,7 +1538,7 @@ class FederationHandler(BaseHandler):
                     missing_auth_events.add(e_id)
 
         for e_id in missing_auth_events:
-            m_ev = yield self.replication_layer.get_pdu(
+            m_ev = yield self.federation_client.get_pdu(
                 [origin],
                 e_id,
                 outlier=True,
@@ -1664,7 +1576,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self.store.persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1675,12 +1587,10 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event, new_event_context,
+        yield self.persist_events_and_notify(
+            [(event, new_event_context)],
         )
 
-        defer.returnValue((event_stream_id, max_stream_id))
-
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
         """
@@ -1699,8 +1609,9 @@ class FederationHandler(BaseHandler):
         )
 
         if not auth_events:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1736,8 +1647,19 @@ class FederationHandler(BaseHandler):
         defer.returnValue(context)
 
     @defer.inlineCallbacks
-    def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+    def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
                       missing):
+        in_room = yield self.auth.check_host_in_room(
+            room_id,
+            origin
+        )
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id
+        )
+
         # Just go through and process each event in `remote_auth_chain`. We
         # don't want to fall into the trap of `missing` being wrong.
         for e in remote_auth_chain:
@@ -1747,7 +1669,6 @@ class FederationHandler(BaseHandler):
                 pass
 
         # Now get the current auth_chain for the event.
-        event = yield self.store.get_event(event_id)
         local_auth_chain = yield self.store.get_auth_chain(
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
@@ -1760,15 +1681,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
@@ -1794,8 +1706,8 @@ class FederationHandler(BaseHandler):
             min_depth=min_depth,
         )
 
-        missing_events = yield self._filter_events_for_server(
-            origin, room_id, missing_events,
+        missing_events = yield filter_events_for_server(
+            self.store, origin, missing_events,
         )
 
         defer.returnValue(missing_events)
@@ -1844,7 +1756,7 @@ class FederationHandler(BaseHandler):
             logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
             try:
-                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
                 )
 
@@ -1917,7 +1829,10 @@ class FederationHandler(BaseHandler):
                     (d.type, d.state_key): d for d in different_events if d
                 })
 
+                room_version = yield self.store.get_room_version(event.room_id)
+
                 new_state = self.state_handler.resolve_events(
+                    room_version,
                     [list(local_view.values()), list(remote_view.values())],
                     event
                 )
@@ -1949,9 +1864,10 @@ class FederationHandler(BaseHandler):
                         break
 
             if do_resolution:
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.prev_state_ids
+                    event, prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(
                     auth_ids, include_given=True
@@ -1959,7 +1875,7 @@ class FederationHandler(BaseHandler):
 
                 try:
                     # 2. Get remote difference.
-                    result = yield self.replication_layer.query_auth(
+                    result = yield self.federation_client.query_auth(
                         origin,
                         event.room_id,
                         event.event_id,
@@ -2041,21 +1957,34 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
             if k != event_key
         }
-        context.current_state_ids = dict(context.current_state_ids)
-        context.current_state_ids.update(state_updates)
-        if context.delta_ids is not None:
-            context.delta_ids = dict(context.delta_ids)
-            context.delta_ids.update(state_updates)
-        context.prev_state_ids = dict(context.prev_state_ids)
-        context.prev_state_ids.update({
+        current_state_ids = yield context.get_current_state_ids(self.store)
+        current_state_ids = dict(current_state_ids)
+
+        current_state_ids.update(state_updates)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({
             k: a.event_id for k, a in iteritems(auth_events)
         })
-        context.state_group = yield self.store.store_state_group(
+
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
+        state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
-            prev_group=context.prev_group,
-            delta_ids=context.delta_ids,
-            current_state_ids=context.current_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+            current_state_ids=current_state_ids,
+        )
+
+        yield context.update_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
         )
 
     @defer.inlineCallbacks
@@ -2245,7 +2174,7 @@ class FederationHandler(BaseHandler):
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
-            yield self.replication_layer.forward_third_party_invite(
+            yield self.federation_client.forward_third_party_invite(
                 destinations,
                 room_id,
                 event_dict,
@@ -2295,7 +2224,8 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.prev_state_ids.get(key)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -2337,7 +2267,8 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        invite_event_id = prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
@@ -2387,7 +2318,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2398,3 +2329,91 @@ class FederationHandler(BaseHandler):
             )
         if "valid" not in response or not response["valid"]:
             raise AuthError(403, "Third party certificate was invalid")
+
+    @defer.inlineCallbacks
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
+        """Persists events and tells the notifier/pushers about them, if
+        necessary.
+
+        Args:
+            event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+            backfilled (bool): Whether these events are a result of
+                backfilling or not
+
+        Returns:
+            Deferred
+        """
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
+
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    self._notify_persisted_event(event, max_stream_id)
+
+    def _notify_persisted_event(self, event, max_stream_id):
+        """Checks to see if notifier/pushers should be notified about the
+        event or not.
+
+        Args:
+            event (FrozenEvent)
+            max_stream_id (int): The max_stream_id returned by persist_events
+        """
+
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+
+            # We notify for memberships if its an invite for one of our
+            # users
+            if event.internal_metadata.is_outlier():
+                if event.membership != Membership.INVITE:
+                    if not self.is_mine_id(target_user_id):
+                        return
+
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+        elif event.internal_metadata.is_outlier():
+            return
+
+        event_stream_id = event.internal_metadata.stream_ordering
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=extra_users
+        )
+
+        self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
+        )
+
+    def _clean_room_for_join(self, room_id):
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
+
+    def user_joined_room(self, user, room_id):
+        """Called when a new user has joined the room
+        """
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index dcae083734..53e5e2648b 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -14,14 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+
 from six import iteritems
 
+from twisted.internet import defer
+
 from synapse.api.errors import SynapseError
 from synapse.types import get_domain_from_id
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,16 +19,18 @@
 
 import logging
 
-import simplejson as json
+from canonicaljson import json
 
 from twisted.internet import defer
 
 from synapse.api.errors import (
-    MatrixCodeMessageException, CodeMessageException
+    CodeMessageException,
+    Codes,
+    HttpResponseException,
+    SynapseError,
 )
+
 from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
-from synapse.api.errors import SynapseError, Codes
 
 logger = logging.getLogger(__name__)
 
@@ -38,6 +41,7 @@ class IdentityHandler(BaseHandler):
         super(IdentityHandler, self).__init__(hs)
 
         self.http_client = hs.get_simple_http_client()
+        self.federation_http_client = hs.get_http_client()
 
         self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
         self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +64,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
-        yield run_on_reactor()
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -83,7 +85,6 @@ class IdentityHandler(BaseHandler):
             )
             defer.returnValue(None)
 
-        data = {}
         try:
             data = yield self.http_client.get_json(
                 "https://%s%s" % (
@@ -92,11 +93,9 @@ class IdentityHandler(BaseHandler):
                 ),
                 {'sid': creds['sid'], 'client_secret': client_secret}
             )
-        except MatrixCodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("getValidated3pid failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
+            raise e.to_synapse_error()
 
         if 'medium' in data:
             defer.returnValue(data)
@@ -104,7 +103,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_threepid(self, creds, mxid):
-        yield run_on_reactor()
         logger.debug("binding threepid %r to %s", creds, mxid)
         data = None
 
@@ -135,13 +133,71 @@ class IdentityHandler(BaseHandler):
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
         except CodeMessageException as e:
-            data = json.loads(e.msg)
+            data = json.loads(e.msg)  # XXX WAT?
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
-        yield run_on_reactor()
+    def try_unbind_threepid(self, mxid, threepid):
+        """Removes a binding from an identity server
+
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
+        """
+        logger.debug("unbinding threepid %r from %s", threepid, mxid)
+        if not self.trusted_id_servers:
+            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+            defer.returnValue(False)
+
+        # We don't track what ID server we added 3pids on (perhaps we ought to)
+        # but we assume that any of the servers in the trusted list are in the
+        # same ID server federation, so we can pick any one of them to send the
+        # deletion request to.
+        id_server = next(iter(self.trusted_id_servers))
+
+        url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        content = {
+            "mxid": mxid,
+            "threepid": threepid,
+        }
+        headers = {}
+        # we abuse the federation http client to sign the request, but we have to send it
+        # using the normal http client since we don't want the SRV lookup and want normal
+        # 'browser-like' HTTPS.
+        self.federation_http_client.sign_request(
+            destination=None,
+            method='POST',
+            url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+            headers_dict=headers,
+            content=content,
+            destination_is=id_server,
+        )
+        try:
+            yield self.http_client.post_json_get_json(
+                url,
+                content,
+                headers,
+            )
+        except HttpResponseException as e:
+            if e.code in (400, 404, 501,):
+                # The remote server probably doesn't support unbinding (yet)
+                logger.warn("Received %d response while unbinding threepid", e.code)
+                defer.returnValue(False)
+            else:
+                logger.error("Failed to unbind threepid on identity server: %s", e)
+                raise SynapseError(502, "Failed to contact identity server")
+
+        defer.returnValue(True)
 
+    @defer.inlineCallbacks
+    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -164,20 +220,15 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
-        except MatrixCodeMessageException as e:
-            logger.info("Proxied requestToken failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
-            raise e
+            raise e.to_synapse_error()
 
     @defer.inlineCallbacks
     def requestMsisdnToken(
             self, id_server, country, phone_number,
             client_secret, send_attempt, **kwargs
     ):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -201,9 +252,6 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
-        except MatrixCodeMessageException as e:
-            logger.info("Proxied requestToken failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
-            raise e
+            raise e.to_synapse_error()
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 71af86fe21..e009395207 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -21,20 +23,15 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
-from synapse.types import (
-    UserID, StreamToken,
-)
+from synapse.types import StreamToken, UserID
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
-import logging
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -151,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
             try:
                 if event.membership == Membership.JOIN:
                     room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
+                    deferred_room_state = run_in_background(
+                        self.state_handler.get_current_state,
+                        event.room_id,
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
+                    deferred_room_state = run_in_background(
+                        self.store.get_state_for_events,
+                        [event.event_id], None,
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -373,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_presence():
+            # If presence is disabled, return an empty list
+            if not self.hs.config.use_presence:
+                defer.returnValue([])
+
             states = yield presence_handler.get_states(
                 [m.user_id for m in room_members],
                 as_event=True,
@@ -390,19 +393,21 @@ class InitialSyncHandler(BaseHandler):
                 receipts = []
             defer.returnValue(receipts)
 
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                run_in_background(get_presence),
-                run_in_background(get_receipts),
-                run_in_background(
-                    self.store.get_recent_events_for_room,
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        presence, receipts, (messages, token) = yield make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_presence),
+                    run_in_background(get_receipts),
+                    run_in_background(
+                        self.store.get_recent_events_for_room,
+                        room_id,
+                        limit=limit,
+                        end_token=now_token.room_key,
+                    )
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError),
+        )
 
         messages = yield filter_events_for_client(
             self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,269 +14,50 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
 import sys
 
-from canonicaljson import encode_canonical_json
 import six
-from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError,
+    AuthError,
+    Codes,
     ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
 )
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.types import RoomAlias, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
-from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -290,12 +71,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -308,53 +89,85 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, types=None, filtered_types=None,
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
             )
-            room_state = room_state[membership_event_id]
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
+            )
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, types, filtered_types=filtered_types,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
@@ -377,7 +190,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
@@ -388,7 +201,7 @@ class MessageHandler(BaseHandler):
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
-        # This can either be because the AS user is in the room or becuase there
+        # This can either be because the AS user is in the room or because there
         # is a user in the room that the AS is "interested in"
         if requester.app_service and user_id not in users_with_profile:
             for uid in users_with_profile:
@@ -422,7 +235,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -431,7 +244,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -463,10 +276,14 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
-
+        Raises:
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
+        yield self.auth.check_auth_blocking(requester.user.to_string())
+
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
@@ -491,7 +308,7 @@ class EventCreationHandler(object):
                         target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
         if not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
@@ -509,12 +326,13 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
-    def _is_exempt_from_privacy_policy(self, builder):
+    def _is_exempt_from_privacy_policy(self, builder, requester):
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
             builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
 
         Returns:
             Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +343,9 @@ class EventCreationHandler(object):
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
                 return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
         return succeed(False)
 
     @defer.inlineCallbacks
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -793,7 +615,7 @@ class EventCreationHandler(object):
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            simplejson.loads(dump)
+            json.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -805,10 +627,9 @@ class EventCreationHandler(object):
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
+                    store=self.store,
                     requester=requester,
                     event=event,
                     context=context,
@@ -883,9 +704,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -921,8 +744,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -942,26 +766,23 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
         )
 
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id
+        self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..5170d093e3
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async_helpers import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
+class PaginationHandler(object):
+    """Handles pagination and purge history requests.
+
+    These are in the same handler due to the fact we need to block clients
+    paginating during a purge.
+    """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
+
+    def start_purge_history(self, room_id, token,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, token, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, token,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, token, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, requester, room_id=None, pagin_config=None,
+                     as_client_event=True, event_filter=None):
+        """Get messages in a room.
+
+        Args:
+            requester (Requester): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+            event_filter (Filter): Filter to apply to results or None
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        with (yield self.pagination_lock.read(room_id)):
+            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+                room_id, user_id
+            )
+
+            if source_config.direction == 'b':
+                # if we're going backwards, we might need to backfill. This
+                # requires that we have a topo token.
+                if room_token.topological:
+                    max_topo = room_token.topological
+                else:
+                    max_topo = yield self.store.get_max_topological_token(
+                        room_id, room_token.stream
+                    )
+
+                if membership == Membership.LEAVE:
+                    # If they have left the room then clamp the token to be before
+                    # they left the room, to save the effort of loading from the
+                    # database.
+                    leave_token = yield self.store.get_topological_token_for_event(
+                        member_event_id
+                    )
+                    leave_token = RoomStreamToken.parse(leave_token)
+                    if leave_token.topological < max_topo:
+                        source_config.from_key = str(leave_token)
+
+                yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                    room_id, max_topo
+                )
+
+            events, next_key = yield self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=source_config.from_key,
+                to_key=source_config.to_key,
+                direction=source_config.direction,
+                limit=source_config.limit,
+                event_filter=event_filter,
+            )
+
+            next_token = pagin_config.from_token.copy_and_replace(
+                "room_key", next_key
+            )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        if event_filter:
+            events = event_filter.filter(events)
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        state = None
+        if event_filter and event_filter.lazy_load_members():
+            # TODO: remove redundant members
+
+            types = [
+                (EventTypes.Member, state_key)
+                for state_key in set(
+                    event.sender  # FIXME: we also care about invite targets etc.
+                    for event in events
+                )
+            ]
+
+            state_ids = yield self.store.get_state_ids_for_event(
+                events[0].event_id, types=types,
+            )
+
+            if state_ids:
+                state = yield self.store.get_events(list(state_ids.values()))
+
+            if state:
+                state = yield filter_events_for_client(
+                    self.store,
+                    user_id,
+                    state.values(),
+                    is_peeking=(member_event_id is None),
+                )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now, as_client_event)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        if state:
+            chunk["state"] = [
+                serialize_event(e, time_now, as_client_event)
+                for e in state
+            ]
+
+        defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..ba3856674d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,27 +22,26 @@ The methods that define policy are:
     - should_notify
 """
 
-from twisted.internet import defer, reactor
+import logging
 from contextlib import contextmanager
 
-from six import itervalues, iteritems
+from six import iteritems, itervalues
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
+from synapse.api.errors import SynapseError
+from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
-
+from synapse.types import UserID, get_domain_from_id
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.async import Linearizer
 from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-from synapse.metrics import LaterGauge
-
-import logging
-
-from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
 
@@ -96,6 +95,7 @@ class PresenceHandler(object):
         Args:
             hs (synapse.server.HomeServer):
         """
+        self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
@@ -179,7 +179,7 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
         self.serial_to_user = {}
         self._next_serial = 1
@@ -231,6 +231,10 @@ class PresenceHandler(object):
         earlier than they should when synapse is restarted. This affect of this
         is some spurious presence changes that will self-correct.
         """
+        # If the DB pool has already terminated, don't try updating
+        if not self.hs.get_db_pool().running:
+            return
+
         logger.info(
             "Performing _on_shutdown. Persisting %d unpersisted changes",
             len(self.user_to_current_state)
@@ -391,6 +395,10 @@ class PresenceHandler(object):
         """We've seen the user do something that indicates they're interacting
         with the app.
         """
+        # If presence is disabled, no-op
+        if not self.hs.config.use_presence:
+            return
+
         user_id = user.to_string()
 
         bump_active_time_counter.inc()
@@ -420,6 +428,11 @@ class PresenceHandler(object):
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
         """
+        # Override if it should affect the user's presence, if presence is
+        # disabled.
+        if not self.hs.config.use_presence:
+            affect_presence = False
+
         if affect_presence:
             curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -465,13 +478,16 @@ class PresenceHandler(object):
         Returns:
             set(str): A set of user_id strings.
         """
-        syncing_user_ids = {
-            user_id for user_id, count in self.user_to_num_current_syncs.items()
-            if count
-        }
-        for user_ids in self.external_process_to_current_syncs.values():
-            syncing_user_ids.update(user_ids)
-        return syncing_user_ids
+        if self.hs.config.use_presence:
+            syncing_user_ids = {
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count
+            }
+            for user_ids in self.external_process_to_current_syncs.values():
+                syncing_user_ids.update(user_ids)
+            return syncing_user_ids
+        else:
+            return set()
 
     @defer.inlineCallbacks
     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 3465a787ab..75b8b7ce6a 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,19 +17,31 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, AuthError, CodeMessageException
+from synapse.api.errors import (
+    AuthError,
+    CodeMessageException,
+    Codes,
+    StoreError,
+    SynapseError,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class ProfileHandler(BaseHandler):
-    PROFILE_UPDATE_MS = 60 * 1000
-    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+class BaseProfileHandler(BaseHandler):
+    """Handles fetching and updating user profile information.
+
+    BaseProfileHandler can be instantiated directly on workers and will
+    delegate to master when necessary. The master process should use the
+    subclass MasterProfileHandler
+    """
 
     def __init__(self, hs):
-        super(ProfileHandler, self).__init__(hs)
+        super(BaseProfileHandler, self).__init__(hs)
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -38,21 +50,21 @@ class ProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
-        if hs.config.worker_app is None:
-            self.clock.looping_call(
-                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
-            )
-
     @defer.inlineCallbacks
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue({
                 "displayname": displayname,
@@ -72,7 +84,6 @@ class ProfileHandler(BaseHandler):
             except CodeMessageException as e:
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
-
                 raise
 
     @defer.inlineCallbacks
@@ -83,12 +94,17 @@ class ProfileHandler(BaseHandler):
         """
         target_user = UserID.from_string(user_id)
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue({
                 "displayname": displayname,
@@ -101,9 +117,14 @@ class ProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_displayname(self, target_user):
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue(displayname)
         else:
@@ -120,7 +141,6 @@ class ProfileHandler(BaseHandler):
             except CodeMessageException as e:
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
-
                 raise
             except Exception:
                 logger.exception("Failed to get displayname")
@@ -155,10 +175,14 @@ class ProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
         if self.hs.is_mine(target_user):
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
-
+            try:
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
             defer.returnValue(avatar_url)
         else:
             try:
@@ -211,16 +235,20 @@ class ProfileHandler(BaseHandler):
         just_field = args.get("field", None)
 
         response = {}
+        try:
+            if just_field is None or just_field == "displayname":
+                response["displayname"] = yield self.store.get_profile_displayname(
+                    user.localpart
+                )
 
-        if just_field is None or just_field == "displayname":
-            response["displayname"] = yield self.store.get_profile_displayname(
-                user.localpart
-            )
-
-        if just_field is None or just_field == "avatar_url":
-            response["avatar_url"] = yield self.store.get_profile_avatar_url(
-                user.localpart
-            )
+            if just_field is None or just_field == "avatar_url":
+                response["avatar_url"] = yield self.store.get_profile_avatar_url(
+                    user.localpart
+                )
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+            raise
 
         defer.returnValue(response)
 
@@ -253,6 +281,26 @@ class ProfileHandler(BaseHandler):
                     room_id, str(e.message)
                 )
 
+
+class MasterProfileHandler(BaseProfileHandler):
+    PROFILE_UPDATE_MS = 60 * 1000
+    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+
+    def __init__(self, hs):
+        super(MasterProfileHandler, self).__init__(hs)
+
+        assert hs.config.worker_app is None
+
+        self.clock.looping_call(
+            self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+        )
+
+    def _start_update_remote_profile_cache(self):
+        return run_as_background_process(
+            "Update remote profile", self._update_remote_profile_cache,
+        )
+
+    @defer.inlineCallbacks
     def _update_remote_profile_cache(self):
         """Called periodically to check profiles of remote users we haven't
         checked in a while.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 5142ae153d..32108568c6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -13,13 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseHandler
+import logging
 
 from twisted.internet import defer
 
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
+
+from ._base import BaseHandler
 
-import logging
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 2e0672161c..a6f3181f09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,17 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.util import logcontext
-
-from ._base import BaseHandler
+import logging
 
 from twisted.internet import defer
 
-from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import get_domain_from_id
+from synapse.util import logcontext
 
-import logging
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -118,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
 
         affected_room_ids = list(set([r["room_id"] for r in receipts]))
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_event(
-                "receipt_key", max_batch_id, rooms=affected_room_ids
-            )
-            # Note that the min here shouldn't be relied upon to be accurate.
-            self.hs.get_pusherpool().on_new_receipts(
-                min_batch_id, max_batch_id, affected_room_ids
-            )
+        self.notifier.on_new_event(
+            "receipt_key", max_batch_id, rooms=affected_room_ids
+        )
+        # Note that the min here shouldn't be relied upon to be accurate.
+        self.hs.get_pusherpool().on_new_receipts(
+            min_batch_id, max_batch_id, affected_room_ids,
+        )
 
-            defer.returnValue(True)
+        defer.returnValue(True)
 
     @logcontext.preserve_fn   # caller should not yield on this
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..1e53f2c635 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,14 +18,19 @@ import logging
 
 from twisted.internet import defer
 
+from synapse import types
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
+    AuthError,
+    Codes,
+    InvalidCaptchaError,
+    RegistrationError,
+    SynapseError,
 )
 from synapse.http.client import CaptchaServerHttpClient
-from synapse import types
-from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.util.async_helpers import Linearizer
 from synapse.util.threepids import check_3pid_allowed
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -40,7 +45,7 @@ class RegistrationHandler(BaseHandler):
             hs (synapse.server.HomeServer):
         """
         super(RegistrationHandler, self).__init__(hs)
-
+        self.hs = hs
         self.auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
@@ -120,13 +125,14 @@ class RegistrationHandler(BaseHandler):
         guest_access_token=None,
         make_guest=False,
         admin=False,
+        threepid=None,
     ):
         """Registers a new client on the server.
 
         Args:
             localpart : The local part of the user ID to register. If None,
               one will be generated.
-            password (str) : The password to assign to this user so they can
+            password (unicode) : The password to assign to this user so they can
               login again. This can be None which means they cannot login again
               via a password (e.g. the user is an application service user).
             generate_token (bool): Whether a new access token should be
@@ -139,7 +145,8 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
+
+        yield self.auth.check_auth_blocking(threepid=threepid)
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -284,6 +291,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID can only contain characters a-z, 0-9, or '=_-./'",
             )
+        yield self.auth.check_auth_blocking()
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
@@ -431,11 +439,9 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
-
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
-
+        yield self.auth.check_auth_blocking()
         need_register = True
 
         try:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..c3f820b975 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,23 +15,29 @@
 # limitations under the License.
 
 """Contains functions for performing events on rooms."""
-from twisted.internet import defer
+import itertools
+import logging
+import math
+import string
+from collections import OrderedDict
 
-from ._base import BaseHandler
+from six import string_types
+
+from twisted.internet import defer
 
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
-    EventTypes, JoinRules, RoomCreationPreset
+    DEFAULT_ROOM_VERSION,
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    JoinRules,
+    RoomCreationPreset,
 )
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
-from collections import OrderedDict
-
-import logging
-import math
-import string
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -92,15 +98,34 @@ class RoomCreationHandler(BaseHandler):
         Raises:
             SynapseError if the room ID couldn't be stored, or something went
             horribly wrong.
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         """
         user_id = requester.user.to_string()
 
+        self.auth.check_auth_blocking(user_id)
+
         if not self.spam_checker.user_may_create_room(user_id):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         if ratelimit:
             yield self.ratelimit(requester)
 
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        if not isinstance(room_version, string_types):
+            raise SynapseError(
+                400,
+                "room_version must be a string",
+                Codes.BAD_JSON,
+            )
+
+        if room_version not in KNOWN_ROOM_VERSIONS:
+            raise SynapseError(
+                400,
+                "Your homeserver does not support this room version",
+                Codes.UNSUPPORTED_ROOM_VERSION,
+            )
+
         if "room_alias_name" in config:
             for wchar in string.whitespace:
                 if wchar in config["room_alias_name"]:
@@ -115,7 +140,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
             if mapping:
-                raise SynapseError(400, "Room alias already taken")
+                raise SynapseError(
+                    400,
+                    "Room alias already taken",
+                    Codes.ROOM_IN_USE
+                )
         else:
             room_alias = None
 
@@ -182,6 +211,9 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
+        # override any attempt to set room versions via the creation_content
+        creation_content["room_version"] = room_version
+
         room_member_handler = self.hs.get_room_member_handler()
 
         yield self._send_events_for_new_room(
@@ -394,9 +426,13 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
     @defer.inlineCallbacks
-    def get_event_context(self, user, room_id, event_id, limit):
+    def get_event_context(self, user, room_id, event_id, limit, event_filter):
         """Retrieves events, pagination tokens and state around a given event
         in a room.
 
@@ -406,6 +442,8 @@ class RoomContextHandler(BaseHandler):
             event_id (str)
             limit (int): The maximum number of events to return in total
                 (excluding state).
+            event_filter (Filter|None): the filter to apply to the events returned
+                (excluding the target event_id)
 
         Returns:
             dict, or None if the event isn't found
@@ -413,8 +451,6 @@ class RoomContextHandler(BaseHandler):
         before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
         users = yield self.store.get_users_in_room(room_id)
         is_peeking = user.to_string() not in users
 
@@ -440,7 +476,7 @@ class RoomContextHandler(BaseHandler):
             )
 
         results = yield self.store.get_events_around(
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter
         )
 
         results["events_before"] = yield filter_evts(results["events_before"])
@@ -452,16 +488,35 @@ class RoomContextHandler(BaseHandler):
         else:
             last_event_id = event_id
 
+        types = None
+        filtered_types = None
+        if event_filter and event_filter.lazy_load_members():
+            members = set(ev.sender for ev in itertools.chain(
+                results["events_before"],
+                (results["event"],),
+                results["events_after"],
+            ))
+            filtered_types = [EventTypes.Member]
+            types = [(EventTypes.Member, member) for member in members]
+
+        # XXX: why do we return the state as of the last event rather than the
+        # first? Shouldn't we be consistent with /sync?
+        # https://github.com/matrix-org/matrix-doc/issues/687
+
         state = yield self.store.get_state_for_events(
-            [last_event_id], None
+            [last_event_id], types, filtered_types=filtered_types,
         )
         results["state"] = list(state[last_event_id].values())
 
-        results["start"] = now_token.copy_and_replace(
+        # We use a dummy token here as we only care about the room portion of
+        # the token, which we replace.
+        token = StreamToken.START
+
+        results["start"] = token.copy_and_replace(
             "room_key", results["start"]
         ).to_string()
 
-        results["end"] = now_token.copy_and_replace(
+        results["end"] = token.copy_and_replace(
             "room_key", results["end"]
         ).to_string()
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fc507cef36..37e41afd61 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -13,26 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+from collections import namedtuple
 
 from six import iteritems
 from six.moves import range
 
-from ._base import BaseHandler
+import msgpack
+from unpaddedbase64 import decode_base64, encode_base64
+
+from twisted.internet import defer
 
-from synapse.api.constants import (
-    EventTypes, JoinRules,
-)
-from synapse.util.async import concurrently_execute
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.types import ThirdPartyInstanceID
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.types import ThirdPartyInstanceID
-
-from collections import namedtuple
-from unpaddedbase64 import encode_base64, decode_base64
 
-import logging
-import msgpack
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -40,7 +38,7 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
 
 # This is used to indicate we should only return rooms published to the main list.
-EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
+EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
 
 class RoomListHandler(BaseHandler):
@@ -52,7 +50,7 @@ class RoomListHandler(BaseHandler):
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
-                                   network_tuple=EMTPY_THIRD_PARTY_ID,):
+                                   network_tuple=EMPTY_THIRD_PARTY_ID,):
         """Generate a local public room list.
 
         There are multiple different lists: the main one plus one per third
@@ -89,7 +87,7 @@ class RoomListHandler(BaseHandler):
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
-                              network_tuple=EMTPY_THIRD_PARTY_ID,):
+                              network_tuple=EMPTY_THIRD_PARTY_ID,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f930e939e8..f643619047 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -21,19 +21,17 @@ from six.moves import http_client
 
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
-from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
+from twisted.internet import defer
+
 import synapse.server
 import synapse.types
-from synapse.api.constants import (
-    EventTypes, Membership,
-)
-from synapse.api.errors import AuthError, SynapseError, Codes
-from synapse.types import UserID, RoomID
-from synapse.util.async import Linearizer
-from synapse.util.distributor import user_left_room, user_joined_room
-
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.types import RoomID, UserID
+from synapse.util.async_helpers import Linearizer
+from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
 
@@ -203,7 +201,9 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -344,6 +344,7 @@ class RoomMemberHandler(object):
         latest_event_ids = (
             event_id for (event_id, _, _) in prev_events_and_hashes
         )
+
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
         )
@@ -498,9 +499,10 @@ class RoomMemberHandler(object):
         if prev_event is not None:
             return
 
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+                guest_can_join = yield self._can_guest_join(prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -519,7 +521,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
@@ -707,6 +709,10 @@ class RoomMemberHandler(object):
             inviter_display_name = member_event.content.get("displayname", "")
             inviter_avatar_url = member_event.content.get("avatar_url", "")
 
+        # if user has no display name, default to their MXID
+        if not inviter_display_name:
+            inviter_display_name = user.to_string()
+
         canonical_room_alias = ""
         canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
         if canonical_alias_event:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 493aec1e48..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,15 +20,24 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.handlers.room_member import RoomMemberHandler
 from synapse.replication.http.membership import (
-    remote_join, remote_reject_invite, get_or_register_3pid_guest,
-    notify_user_membership_change,
+    ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+    ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+    ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+    ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
 )
 
-
 logger = logging.getLogger(__name__)
 
 
 class RoomMemberWorkerHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberWorkerHandler, self).__init__(hs)
+
+        self._get_register_3pid_client = Repl3PID.make_client(hs)
+        self._remote_join_client = ReplRemoteJoin.make_client(hs)
+        self._remote_reject_client = ReplRejectInvite.make_client(hs)
+        self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
@@ -36,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        ret = yield remote_join(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        ret = yield self._remote_join_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -54,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Implements RoomMemberHandler._remote_reject_invite
         """
-        return remote_reject_invite(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._remote_reject_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -67,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_joined_room(self, target, room_id):
         """Implements RoomMemberHandler._user_joined_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="joined",
@@ -79,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_left_room(self, target, room_id):
         """Implements RoomMemberHandler._user_left_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="left",
@@ -91,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
         """Implements RoomMemberHandler.get_or_register_3pid_guest
         """
-        return get_or_register_3pid_guest(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._get_register_3pid_client(
             requester=requester,
             medium=medium,
             address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 1eca26aa1e..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -13,21 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import itertools
+import logging
 
-from ._base import BaseHandler
+from unpaddedbase64 import decode_base64, encode_base64
 
-from synapse.api.constants import Membership, EventTypes
-from synapse.api.filtering import Filter
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
 from synapse.events.utils import serialize_event
 from synapse.visibility import filter_events_for_client
 
-from unpaddedbase64 import decode_base64, encode_base64
-
-import itertools
-import logging
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -64,6 +63,13 @@ class SearchHandler(BaseHandler):
             except Exception:
                 raise SynapseError(400, "Invalid batch")
 
+        logger.info(
+            "Search batch properties: %r, %r, %r",
+            batch_group, batch_group_key, batch_token,
+        )
+
+        logger.info("Search content: %s", content)
+
         try:
             room_cat = content["search_categories"]["room_events"]
 
@@ -271,6 +277,8 @@ class SearchHandler(BaseHandler):
             # We should never get here due to the guard earlier.
             raise NotImplementedError()
 
+        logger.info("Found %d events to return", len(allowed_events))
+
         # If client has asked for "context" for each event (i.e. some surrounding
         # events and state), fetch that
         if event_context is not None:
@@ -279,7 +287,12 @@ class SearchHandler(BaseHandler):
             contexts = {}
             for event in allowed_events:
                 res = yield self.store.get_events_around(
-                    event.room_id, event.event_id, before_limit, after_limit
+                    event.room_id, event.event_id, before_limit, after_limit,
+                )
+
+                logger.info(
+                    "Context for search returned %d and %d events",
+                    len(res["events_before"]), len(res["events_after"]),
                 )
 
                 res["events_before"] = yield filter_events_for_client(
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index e057ae54c9..7ecdede4dc 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import Codes, StoreError, SynapseError
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..ef20c2296c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,24 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.constants import Membership, EventTypes
-from synapse.util.async import concurrently_execute
+import collections
+import itertools
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.push.clientformat import format_push_rules_for_user
+from synapse.types import RoomStreamToken
+from synapse.util.async_helpers import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
-from synapse.util.caches.response_cache import ResponseCache
-from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
-from synapse.types import RoomStreamToken
 
-from twisted.internet import defer
-
-import collections
-import logging
-import itertools
+logger = logging.getLogger(__name__)
 
-from six import itervalues, iteritems
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
 
-logger = logging.getLogger(__name__)
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 
 
 SyncConfig = collections.namedtuple("SyncConfig", [
@@ -64,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "ephemeral",
     "account_data",
     "unread_notifications",
+    "summary",
 ])):
     __slots__ = []
 
@@ -145,7 +157,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "invited",  # InvitedSyncResult for each invited room.
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
-    "device_lists",  # List of user_ids whose devices have chanegd
+    "device_lists",  # List of user_ids whose devices have changed
     "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                    # for this device
     "groups",
@@ -173,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
 class SyncHandler(object):
 
     def __init__(self, hs):
+        self.hs_config = hs.config
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
@@ -180,20 +193,35 @@ class SyncHandler(object):
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
+
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
 
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
-            A Deferred SyncResult.
+            Deferred[SyncResult]
         """
-        return self.response_cache.wrap(
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        user_id = sync_config.user.to_string()
+        yield self.auth.check_auth_blocking(user_id)
+
+        res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config, since_token, timeout, full_state,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -416,29 +444,44 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None, filtered_types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(
+            event.event_id, types, filtered_types=filtered_types,
+        )
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +496,9 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(
+                last_event, types, filtered_types=filtered_types,
+            )
 
         else:
             # no events in this room - so presumably no state
@@ -461,9 +506,141 @@ class SyncHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+        """ Works out a room summary block for this room, summarising the number
+        of joined members in the room, and providing the 'hero' members if the
+        room has no name so clients can consistently name rooms.  Also adds
+        state events to 'state' if needed to describe the heroes.
+
+        Args:
+            room_id(str):
+            sync_config(synapse.handlers.sync.SyncConfig):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            state(dict): dict of (type, state_key) -> Event as returned by
+                compute_state_delta
+            now_token(str): Token of the end of the current batch.
+
+        Returns:
+             A deferred dict describing the room summary
+        """
+
+        # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+            room_id, end_token=now_token.room_key, limit=1,
+        )
+
+        if not last_events:
+            defer.returnValue(None)
+            return
+
+        last_event = last_events[-1]
+        state_ids = yield self.store.get_state_ids_for_event(
+            last_event.event_id, [
+                (EventTypes.Member, None),
+                (EventTypes.Name, ''),
+                (EventTypes.CanonicalAlias, ''),
+            ]
+        )
+
+        member_ids = {
+            state_key: event_id
+            for (t, state_key), event_id in state_ids.iteritems()
+            if t == EventTypes.Member
+        }
+        name_id = state_ids.get((EventTypes.Name, ''))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+        summary = {}
+
+        # FIXME: it feels very heavy to load up every single membership event
+        # just to calculate the counts.
+        member_events = yield self.store.get_events(member_ids.values())
+
+        joined_user_ids = []
+        invited_user_ids = []
+
+        for ev in member_events.values():
+            if ev.content.get("membership") == Membership.JOIN:
+                joined_user_ids.append(ev.state_key)
+            elif ev.content.get("membership") == Membership.INVITE:
+                invited_user_ids.append(ev.state_key)
+
+        # TODO: only send these when they change.
+        summary["m.joined_member_count"] = len(joined_user_ids)
+        summary["m.invited_member_count"] = len(invited_user_ids)
+
+        if name_id or canonical_alias_id:
+            defer.returnValue(summary)
+
+        # FIXME: order by stream ordering, not alphabetic
+
+        me = sync_config.user.to_string()
+        if (joined_user_ids or invited_user_ids):
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in (joined_user_ids + invited_user_ids)
+                    if user_id != me
+                ]
+            )[0:5]
+        else:
+            summary['m.heroes'] = sorted(
+                [user_id for user_id in member_ids.keys() if user_id != me]
+            )[0:5]
+
+        if not sync_config.filter_collection.lazy_load_members():
+            defer.returnValue(summary)
+
+        # ensure we send membership events for heroes if needed
+        cache_key = (sync_config.user.to_string(), sync_config.device_id)
+        cache = self.get_lazy_loaded_members_cache(cache_key)
+
+        # track which members the client should already know about via LL:
+        # Ones which are already in state...
+        existing_members = set(
+            user_id for (typ, user_id) in state.keys()
+            if typ == EventTypes.Member
+        )
+
+        # ...or ones which are in the timeline...
+        for ev in batch.events:
+            if ev.type == EventTypes.Member:
+                existing_members.add(ev.state_key)
+
+        # ...and then ensure any missing ones get included in state.
+        missing_hero_event_ids = [
+            member_ids[hero_id]
+            for hero_id in summary['m.heroes']
+            if (
+                cache.get(hero_id) != member_ids[hero_id] and
+                hero_id not in existing_members
+            )
+        ]
+
+        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = missing_hero_state.values()
+
+        for s in missing_hero_state:
+            cache.set(s.state_key, s.event_id)
+            state[(EventTypes.Member, s.state_key)] = s
+
+        defer.returnValue(summary)
+
+    def get_lazy_loaded_members_cache(self, cache_key):
+        cache = self.lazy_loaded_members_cache.get(cache_key)
+        if cache is None:
+            logger.debug("creating LruCache for %r", cache_key)
+            cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+            self.lazy_loaded_members_cache[cache_key] = cache
+        else:
+            logger.debug("found LruCache for %r", cache_key)
+        return cache
+
+    @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
                             full_state):
-        """ Works out the differnce in state between the start of the timeline
+        """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
@@ -477,7 +654,7 @@ class SyncHandler(object):
             full_state(bool): Whether to force returning the full state.
 
         Returns:
-             A deferred new event dictionary
+             A deferred dict of (type, state_key) -> Event
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -485,59 +662,130 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            filtered_types = None
+
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # only apply the filtering to room members
+                filtered_types = [EventTypes.Member]
+
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
                     previous={},
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    if types:
+                        # We're returning an incremental sync, with no "gap" since
+                        # the previous sync, so normally there would be no state to return
+                        # But we're lazy-loading, so the client might need some more
+                        # member events to understand the events in this timeline.
+                        # So we fish out all the member events corresponding to the
+                        # timeline here, and then dedupe any redundant ones below.
+
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types,
+                            filtered_types=None,  # we only want members!
+                        )
+
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.get_lazy_loaded_members_cache(cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
 
         state = {}
         if state_ids:
@@ -620,7 +868,7 @@ class SyncHandler(object):
             since_token is None and
             sync_config.filter_collection.blocks_all_presence()
         )
-        if not block_all_presence_data:
+        if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
                 sync_result_builder, newly_joined_rooms, newly_joined_users
             )
@@ -1312,7 +1560,6 @@ class SyncHandler(object):
             if events == [] and tags is None:
                 return
 
-        since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
@@ -1355,6 +1602,18 @@ class SyncHandler(object):
             full_state=full_state
         )
 
+        summary = {}
+        if (
+            sync_config.filter_collection.lazy_load_members() and
+            (
+                any(ev.type == EventTypes.Member for ev in batch.events) or
+                since_token is None
+            )
+        ):
+            summary = yield self.compute_summary(
+                room_id, sync_config, batch, state, now_token
+            )
+
         if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
@@ -1364,6 +1623,7 @@ class SyncHandler(object):
                 ephemeral=ephemeral,
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
+                summary=summary,
             )
 
             if room_sync or always_include:
@@ -1448,7 +1708,9 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+    timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
     """Works out what state to include in a sync response.
 
     Args:
@@ -1457,6 +1719,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
+        lazy_load_members (bool): whether to return members from timeline_start
+            or not.  assumes that timeline_start has already been filtered to
+            include only the members the client needs to know about.
 
     Returns:
         dict
@@ -1472,9 +1737,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
+
+    # If we are lazyloading room members, we explicitly add the membership events
+    # for the senders in the timeline into the state block returned by /sync,
+    # as we may not have sent them to the client before.  We find these membership
+    # events by filtering them out of timeline_start, which has already been filtered
+    # to only include membership events for the senders in the timeline.
+    # In practice, we can do this by removing them from the p_ids list,
+    # which is the list of relevant state we know we have already sent to the client.
+    # see https://github.com/matrix-org/synapse/pull/2970
+    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+    if lazy_load_members:
+        p_ids.difference_update(
+            e for t, e in timeline_start.iteritems()
+            if t[0] == EventTypes.Member
+        )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5d9736e88f..2d2d3d5a0d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -13,17 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from collections import namedtuple
+
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, AuthError
+from synapse.api.errors import AuthError, SynapseError
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-
-import logging
-
-from collections import namedtuple
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..d8413d6aa7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,15 +14,15 @@
 # limitations under the License.
 
 import logging
+
+from six import iteritems
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
-from synapse.util.metrics import Measure
-from synapse.util.async import sleep
 from synapse.types import get_localpart_from_id
-
-from six import iteritems
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -119,6 +119,8 @@ class UserDirectoryHandler(object):
         """Called to update index of our local user profiles when they change
         irrespective of any rooms the user may be in.
         """
+        # FIXME(#3714): We should probably do this in the same worker as all
+        # the other changes.
         yield self.store.update_profile_in_user_dir(
             user_id, profile.display_name, profile.avatar_url, None,
         )
@@ -127,6 +129,8 @@ class UserDirectoryHandler(object):
     def handle_user_deactivated(self, user_id):
         """Called when a user ID is deactivated
         """
+        # FIXME(#3714): We should probably do this in the same worker as all
+        # the other changes.
         yield self.store.remove_from_user_dir(user_id)
         yield self.store.remove_from_user_in_public_room(user_id)
 
@@ -174,7 +178,7 @@ class UserDirectoryHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
@@ -188,7 +192,7 @@ class UserDirectoryHandler(object):
                 logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
                 yield self._handle_local_user(user_id)
                 num_processed_users += 1
-                yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
 
             logger.info("Processed all users")
 
@@ -236,7 +240,7 @@ class UserDirectoryHandler(object):
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -251,7 +255,7 @@ class UserDirectoryHandler(object):
                     continue
 
                 if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
                 count += 1
 
                 user_set = (user_id, other_user_id)