summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-10-22 20:19:40 +0200
committerMatthew Hodgson <matthew@matrix.org>2018-10-22 20:19:40 +0200
commitc7503f8f335bda84a2c40b659a409df05538868d (patch)
tree8844135554b54605a14287f63a5fe5a0959b0ad8 /synapse/handlers
parentfix missing import and run isort (diff)
parentUse recaptcha_ajax.js directly from Google (diff)
downloadsynapse-c7503f8f335bda84a2c40b659a409df05538868d.tar.xz
merge in master
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py4
-rw-r--r--synapse/handlers/_base.py3
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--synapse/handlers/auth.py55
-rw-r--r--synapse/handlers/deactivate_account.py13
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/events.py25
-rw-r--r--synapse/handlers/federation.py409
-rw-r--r--synapse/handlers/identity.py57
-rw-r--r--synapse/handlers/initial_sync.py44
-rw-r--r--synapse/handlers/message.py406
-rw-r--r--synapse/handlers/pagination.py298
-rw-r--r--synapse/handlers/presence.py33
-rw-r--r--synapse/handlers/profile.py98
-rw-r--r--synapse/handlers/read_marker.py2
-rw-r--r--synapse/handlers/receipts.py18
-rw-r--r--synapse/handlers/register.py11
-rw-r--r--synapse/handlers/room.py74
-rw-r--r--synapse/handlers/room_list.py2
-rw-r--r--synapse/handlers/room_member.py15
-rw-r--r--synapse/handlers/room_member_worker.py41
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/sync.py336
23 files changed, 1293 insertions, 670 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
 from .directory import DirectoryHandler
 from .federation import FederationHandler
 from .identity import IdentityHandler
-from .message import MessageHandler
 from .register import RegistrationHandler
-from .room import RoomContextHandler
 from .search import SearchHandler
 
 
@@ -44,10 +42,8 @@ class Handlers(object):
 
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
-        self.message_handler = MessageHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
-        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
             guest_access = event.content.get("guest_access", "forbidden")
             if guest_access != "can_join":
                 if context:
+                    current_state_ids = yield context.get_current_state_ids(self.store)
                     current_state = yield self.store.get_events(
-                        list(context.current_state_ids.values())
+                        list(current_state_ids.values())
                     )
                 else:
                     current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..f0f89af7dc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,11 @@ from twisted.internet import defer
 
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.metrics import (
+    event_processing_loop_counter,
+    event_processing_loop_room_count,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
 
@@ -106,7 +111,9 @@ class ApplicationServicesHandler(object):
                             yield self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
-                            self.scheduler.start().addErrback(log_failure)
+                            def start_scheduler():
+                                return self.scheduler.start().addErrback(log_failure)
+                            run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
                         # Fork off pushes to these services
@@ -133,6 +140,12 @@ class ApplicationServicesHandler(object):
 
                     events_processed_counter.inc(len(events))
 
+                    event_processing_loop_room_count.labels(
+                        "appservice_sender"
+                    ).inc(len(events_by_room))
+
+                    event_processing_loop_counter.labels("appservice_sender").inc()
+
                     synapse.metrics.event_processing_lag.labels(
                         "appservice_sender").set(now - ts)
                     synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 402e44cdef..4a81bd2ba9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+import unicodedata
 
 import attr
 import bcrypt
@@ -519,6 +520,7 @@ class AuthHandler(BaseHandler):
         """
         logger.info("Logging in user %s on device %s", user_id, device_id)
         access_token = yield self.issue_access_token(user_id, device_id)
+        yield self.auth.check_auth_blocking(user_id)
 
         # the device *should* have been registered before we got here; however,
         # it's possible we raced against a DELETE operation. The thing we
@@ -626,6 +628,7 @@ class AuthHandler(BaseHandler):
         # special case to check for "password" for the check_password interface
         # for the auth providers
         password = login_submission.get("password")
+
         if login_type == LoginType.PASSWORD:
             if not self._password_enabled:
                 raise SynapseError(400, "Password login has been disabled.")
@@ -707,9 +710,10 @@ class AuthHandler(BaseHandler):
         multiple inexact matches.
 
         Args:
-            user_id (str): complete @user:id
+            user_id (unicode): complete @user:id
+            password (unicode): the provided password
         Returns:
-            (str) the canonical_user_id, or None if unknown user / bad password
+            (unicode) the canonical_user_id, or None if unknown user / bad password
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -728,15 +732,18 @@ class AuthHandler(BaseHandler):
                                                   device_id)
         defer.returnValue(access_token)
 
+    @defer.inlineCallbacks
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         auth_api = self.hs.get_auth()
+        user_id = None
         try:
             macaroon = pymacaroons.Macaroon.deserialize(login_token)
             user_id = auth_api.get_user_id_from_macaroon(macaroon)
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
-            return user_id
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        yield self.auth.check_auth_blocking(user_id)
+        defer.returnValue(user_id)
 
     @defer.inlineCallbacks
     def delete_access_token(self, access_token):
@@ -821,12 +828,26 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def delete_threepid(self, user_id, medium, address):
+        """Attempts to unbind the 3pid on the identity servers and deletes it
+        from the local database.
+
+        Args:
+            user_id (str)
+            medium (str)
+            address (str)
+
+        Returns:
+            Deferred[bool]: Returns True if successfully unbound the 3pid on
+            the identity server, False if identity server doesn't support the
+            unbind API.
+        """
+
         # 'Canonicalise' email addresses as per above
         if medium == 'email':
             address = address.lower()
 
         identity_handler = self.hs.get_handlers().identity_handler
-        yield identity_handler.unbind_threepid(
+        result = yield identity_handler.try_unbind_threepid(
             user_id,
             {
                 'medium': medium,
@@ -834,10 +855,10 @@ class AuthHandler(BaseHandler):
             },
         )
 
-        ret = yield self.store.user_delete_threepid(
+        yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
-        defer.returnValue(ret)
+        defer.returnValue(result)
 
     def _save_session(self, session):
         # TODO: Persistent storage
@@ -849,14 +870,19 @@ class AuthHandler(BaseHandler):
         """Computes a secure hash of password.
 
         Args:
-            password (str): Password to hash.
+            password (unicode): Password to hash.
 
         Returns:
-            Deferred(str): Hashed password.
+            Deferred(unicode): Hashed password.
         """
         def _do_hash():
-            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
-                                 bcrypt.gensalt(self.bcrypt_rounds))
+            # Normalise the Unicode in the password
+            pw = unicodedata.normalize("NFKC", password)
+
+            return bcrypt.hashpw(
+                pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
+                bcrypt.gensalt(self.bcrypt_rounds),
+            ).decode('ascii')
 
         return make_deferred_yieldable(
             threads.deferToThreadPool(
@@ -868,16 +894,19 @@ class AuthHandler(BaseHandler):
         """Validates that self.hash(password) == stored_hash.
 
         Args:
-            password (str): Password to hash.
-            stored_hash (str): Expected hash value.
+            password (unicode): Password to hash.
+            stored_hash (unicode): Expected hash value.
 
         Returns:
             Deferred(bool): Whether self.hash(password) == stored_hash.
         """
 
         def _do_validate_hash():
+            # Normalise the Unicode in the password
+            pw = unicodedata.normalize("NFKC", password)
+
             return bcrypt.checkpw(
-                password.encode('utf8') + self.hs.config.password_pepper,
+                pw.encode('utf8') + self.hs.config.password_pepper.encode("utf8"),
                 stored_hash.encode('utf8')
             )
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 3a08208fd8..3bd59d2dd4 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -52,7 +52,8 @@ class DeactivateAccountHandler(BaseHandler):
             erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
-            Deferred
+            Deferred[bool]: True if identity server supports removing
+            threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
@@ -61,16 +62,22 @@ class DeactivateAccountHandler(BaseHandler):
         # leave the user still active so they can try again.
         # Ideally we would prevent password resets and then do this in the
         # background thread.
+
+        # This will be set to false if the identity server doesn't support
+        # unbinding
+        identity_server_supports_unbinding = True
+
         threepids = yield self.store.user_get_threepids(user_id)
         for threepid in threepids:
             try:
-                yield self._identity_handler.unbind_threepid(
+                result = yield self._identity_handler.try_unbind_threepid(
                     user_id,
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
                     },
                 )
+                identity_server_supports_unbinding &= result
             except Exception:
                 # Do we want this to be a fatal error or should we carry on?
                 logger.exception("Failed to remove threepid from ID server")
@@ -107,6 +114,8 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        defer.returnValue(identity_server_supports_unbinding)
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2d44f15da3..9e017116a9 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import FederationDeniedError
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index c3f2d7feff..f772e62c28 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,10 +19,12 @@ import random
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError
 from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.types import UserID
 from synapse.util.logutils import log_function
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -129,11 +131,13 @@ class EventStreamHandler(BaseHandler):
 class EventHandler(BaseHandler):
 
     @defer.inlineCallbacks
-    def get_event(self, user, event_id):
+    def get_event(self, user, room_id, event_id):
         """Retrieve a single specified event.
 
         Args:
             user (synapse.types.UserID): The user requesting the event
+            room_id (str|None): The expected room id. We'll return None if the
+                event's room does not match.
             event_id (str): The event ID to obtain.
         Returns:
             dict: An event, or None if there is no event matching this ID.
@@ -142,13 +146,26 @@ class EventHandler(BaseHandler):
             AuthError if the user does not have the rights to inspect this
             event.
         """
-        event = yield self.store.get_event(event_id)
+        event = yield self.store.get_event(event_id, check_room_id=room_id)
 
         if not event:
             defer.returnValue(None)
             return
 
-        if hasattr(event, "room_id"):
-            yield self.auth.check_joined_room(event.room_id, user.to_string())
+        users = yield self.store.get_users_in_room(event.room_id)
+        is_peeking = user.to_string() not in users
+
+        filtered = yield filter_events_for_client(
+            self.store,
+            user.to_string(),
+            [event],
+            is_peeking=is_peeking
+        )
+
+        if not filtered:
+            raise AuthError(
+                403,
+                "You don't have permission to access that event."
+            )
 
         defer.returnValue(event)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 20fb46fc89..3dd107a285 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
 import sys
 
 import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
 
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    Membership,
+    RejectedReason,
+)
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -44,10 +49,15 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room
 from synapse.util.frozenutils import unfreeze
 from synapse.util.logutils import log_function
@@ -76,7 +86,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_federation_client()
+        self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -86,6 +96,18 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -255,7 +277,7 @@ class FederationHandler(BaseHandler):
                     # know about
                     for p in prevs - seen:
                         state, got_auth_chain = (
-                            yield self.replication_layer.get_state_for_room(
+                            yield self.federation_client.get_state_for_room(
                                 origin, pdu.room_id, p
                             )
                         )
@@ -338,7 +360,7 @@ class FederationHandler(BaseHandler):
         #
         # see https://github.com/matrix-org/synapse/pull/1744
 
-        missing_events = yield self.replication_layer.get_missing_events(
+        missing_events = yield self.federation_client.get_missing_events(
             origin,
             pdu.room_id,
             earliest_events_ids=list(latest),
@@ -400,7 +422,7 @@ class FederationHandler(BaseHandler):
             )
 
             try:
-                event_stream_id, max_stream_id = yield self._persist_auth_tree(
+                yield self._persist_auth_tree(
                     origin, auth_chain, state, event
                 )
             except AuthError as e:
@@ -444,7 +466,7 @@ class FederationHandler(BaseHandler):
                 yield self._handle_new_events(origin, event_infos)
 
             try:
-                context, event_stream_id, max_stream_id = yield self._handle_new_event(
+                context = yield self._handle_new_event(
                     origin,
                     event,
                     state=state,
@@ -469,24 +491,16 @@ class FederationHandler(BaseHandler):
             except StoreError:
                 logger.exception("Failed to store room.")
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=extra_users
-        )
-
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 # Only fire user_joined_room if the user has acutally
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.prev_state_ids.get(
+
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+                prev_state_id = prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -498,7 +512,7 @@ class FederationHandler(BaseHandler):
 
                 if newly_joined:
                     user = UserID.from_string(event.state_key)
-                    yield user_joined_room(self.distributor, user, event.room_id)
+                    yield self.user_joined_room(user, event.room_id)
 
     @log_function
     @defer.inlineCallbacks
@@ -519,7 +533,7 @@ class FederationHandler(BaseHandler):
         if dest == self.server_name:
             raise SynapseError(400, "Can't backfill from self.")
 
-        events = yield self.replication_layer.backfill(
+        events = yield self.federation_client.backfill(
             dest,
             room_id,
             limit=limit,
@@ -567,7 +581,7 @@ class FederationHandler(BaseHandler):
         state_events = {}
         events_to_state = {}
         for e_id in edges:
-            state, auth = yield self.replication_layer.get_state_for_room(
+            state, auth = yield self.federation_client.get_state_for_room(
                 destination=dest,
                 room_id=room_id,
                 event_id=e_id
@@ -609,7 +623,7 @@ class FederationHandler(BaseHandler):
                 results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
                     [
                         logcontext.run_in_background(
-                            self.replication_layer.get_pdu,
+                            self.federation_client.get_pdu,
                             [dest],
                             event_id,
                             outlier=True,
@@ -731,7 +745,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in state.iteritems()
+                for (e_type, state_key), event in iteritems(state)
                 if e_type == EventTypes.Member
                 and event.membership == Membership.JOIN
             ]
@@ -748,7 +762,7 @@ class FederationHandler(BaseHandler):
                 except Exception:
                     pass
 
-            return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+            return sorted(joined_domains.items(), key=lambda d: d[1])
 
         curr_domains = get_domains_from_state(curr_state)
 
@@ -811,7 +825,7 @@ class FederationHandler(BaseHandler):
         tried_domains = set(likely_domains)
         tried_domains.add(self.server_name)
 
-        event_ids = list(extremities.iterkeys())
+        event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = logcontext.preserve_fn(
@@ -827,15 +841,15 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
-            [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
             get_prev_content=False
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in state_dict.iteritems()
+                for k, e_id in iteritems(state_dict)
                 if e_id in state_map
-            } for key, state_dict in states.iteritems()
+            } for key, state_dict in iteritems(states)
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -890,7 +904,7 @@ class FederationHandler(BaseHandler):
 
         Invites must be signed by the invitee's server before distribution.
         """
-        pdu = yield self.replication_layer.send_invite(
+        pdu = yield self.federation_client.send_invite(
             destination=target_host,
             room_id=event.room_id,
             event_id=event.event_id,
@@ -906,16 +920,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -940,6 +944,9 @@ class FederationHandler(BaseHandler):
             joinee,
             "join",
             content,
+            params={
+                "ver": KNOWN_ROOM_VERSIONS,
+            },
         )
 
         # This shouldn't happen, because the RoomMemberHandler has a
@@ -949,7 +956,7 @@ class FederationHandler(BaseHandler):
 
         self.room_queues[room_id] = []
 
-        yield self.store.clean_room_for_join(room_id)
+        yield self._clean_room_for_join(room_id)
 
         handled_events = set()
 
@@ -962,7 +969,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-            ret = yield self.replication_layer.send_join(target_hosts, event)
+            ret = yield self.federation_client.send_join(target_hosts, event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -988,15 +995,10 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            event_stream_id, max_stream_id = yield self._persist_auth_tree(
+            yield self._persist_auth_tree(
                 origin, auth_chain, state, event
             )
 
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=[joinee]
-            )
-
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -1091,7 +1093,7 @@ class FederationHandler(BaseHandler):
         # would introduce the danger of backwards-compatibility problems.
         event.internal_metadata.send_on_behalf_of = origin
 
-        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+        context = yield self._handle_new_event(
             origin, event
         )
 
@@ -1101,25 +1103,17 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
-        )
-
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
-                yield user_joined_room(self.distributor, user, event.room_id)
+                yield self.user_joined_room(user, event.room_id)
 
-        state_ids = list(context.prev_state_ids.values())
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        state_ids = list(prev_state_ids.values())
         auth_chain = yield self.store.get_auth_chain(state_ids)
 
-        state = yield self.store.get_events(list(context.prev_state_ids.values()))
+        state = yield self.store.get_events(list(prev_state_ids.values()))
 
         defer.returnValue({
             "state": list(state.values()),
@@ -1181,17 +1175,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-        )
-
-        target_user = UserID.from_string(event.state_key)
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=[target_user],
-        )
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1216,35 +1200,26 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
+        yield self.federation_client.send_leave(
             target_hosts,
             event
         )
 
         context = yield self.state_handler.compute_event_context(event)
-
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-        )
-
-        target_user = UserID.from_string(event.state_key)
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id,
-            extra_users=[target_user],
-        )
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
-                               content={},):
-        origin, pdu = yield self.replication_layer.make_membership_event(
+                               content={}, params=None):
+        origin, pdu = yield self.federation_client.make_membership_event(
             target_hosts,
             room_id,
             user_id,
             membership,
             content,
+            params=params,
         )
 
         logger.debug("Got response to make_%s: %s", membership, pdu)
@@ -1284,7 +1259,7 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_leave_request(self, room_id, user_id):
         """ We've received a /make_leave/ request, so we create a partial
-        join event for the room and return that. We do *not* persist or
+        leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
         builder = self.event_builder_factory.new({
@@ -1323,7 +1298,7 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+        yield self._handle_new_event(
             origin, event
         )
 
@@ -1333,22 +1308,17 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-
-        self.notifier.on_new_room_event(
-            event, event_stream_id, max_stream_id, extra_users=extra_users
-        )
-
         defer.returnValue(None)
 
     @defer.inlineCallbacks
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
+
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id,
+        )
+
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1359,8 +1329,7 @@ class FederationHandler(BaseHandler):
                 (e.type, e.state_key): e for e in state
             }
 
-            event = yield self.store.get_event(event_id)
-            if event and event.is_state():
+            if event.is_state():
                 # Get previous state
                 if "replaces_state" in event.unsigned:
                     prev_id = event.unsigned["replaces_state"]
@@ -1371,18 +1340,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1391,6 +1348,10 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id,
+        )
+
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
@@ -1399,8 +1360,7 @@ class FederationHandler(BaseHandler):
             _, state = state_groups.items().pop()
             results = state
 
-            event = yield self.store.get_event(event_id)
-            if event and event.is_state():
+            if event.is_state():
                 # Get previous state
                 if "replaces_state" in event.unsigned:
                     prev_id = event.unsigned["replaces_state"]
@@ -1454,18 +1414,6 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
             in_room = yield self.auth.check_host_in_room(
                 event.room_id,
                 origin
@@ -1501,9 +1449,8 @@ class FederationHandler(BaseHandler):
                     event, context
                 )
 
-            event_stream_id, max_stream_id = yield self.store.persist_event(
-                event,
-                context=context,
+            yield self.persist_events_and_notify(
+                [(event, context)],
                 backfilled=backfilled,
             )
         except:  # noqa: E722, as we reraise the exception this is fine.
@@ -1516,15 +1463,7 @@ class FederationHandler(BaseHandler):
 
             six.reraise(tp, value, tb)
 
-        if not backfilled:
-            # this intentionally does not yield: we don't care about the result
-            # and don't need to wait for it.
-            logcontext.run_in_background(
-                self.pusher_pool.on_new_notifications,
-                event_stream_id, max_stream_id,
-            )
-
-        defer.returnValue((context, event_stream_id, max_stream_id))
+        defer.returnValue(context)
 
     @defer.inlineCallbacks
     def _handle_new_events(self, origin, event_infos, backfilled=False):
@@ -1532,6 +1471,8 @@ class FederationHandler(BaseHandler):
         should not depend on one another, e.g. this should be used to persist
         a bunch of outliers, but not a chunk of individual events that depend
         on each other for state calculations.
+
+        Notifies about the events where appropriate.
         """
         contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
@@ -1546,10 +1487,10 @@ class FederationHandler(BaseHandler):
             ], consumeErrors=True,
         ))
 
-        yield self.store.persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
-                for ev_info, context in itertools.izip(event_infos, contexts)
+                for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
         )
@@ -1558,7 +1499,8 @@ class FederationHandler(BaseHandler):
     def _persist_auth_tree(self, origin, auth_events, state, event):
         """Checks the auth chain is valid (and passes auth checks) for the
         state and event. Then persists the auth chain and state atomically.
-        Persists the event seperately.
+        Persists the event separately. Notifies about the persisted events
+        where appropriate.
 
         Will attempt to fetch missing auth events.
 
@@ -1569,8 +1511,7 @@ class FederationHandler(BaseHandler):
             event (Event)
 
         Returns:
-            2-tuple of (event_stream_id, max_stream_id) from the persist_event
-            call for `event`
+            Deferred
         """
         events_to_context = {}
         for e in itertools.chain(auth_events, state):
@@ -1596,7 +1537,7 @@ class FederationHandler(BaseHandler):
                     missing_auth_events.add(e_id)
 
         for e_id in missing_auth_events:
-            m_ev = yield self.replication_layer.get_pdu(
+            m_ev = yield self.federation_client.get_pdu(
                 [origin],
                 e_id,
                 outlier=True,
@@ -1634,7 +1575,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self.store.persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1645,12 +1586,10 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event, new_event_context,
+        yield self.persist_events_and_notify(
+            [(event, new_event_context)],
         )
 
-        defer.returnValue((event_stream_id, max_stream_id))
-
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
         """
@@ -1669,8 +1608,9 @@ class FederationHandler(BaseHandler):
         )
 
         if not auth_events:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1706,8 +1646,19 @@ class FederationHandler(BaseHandler):
         defer.returnValue(context)
 
     @defer.inlineCallbacks
-    def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+    def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
                       missing):
+        in_room = yield self.auth.check_host_in_room(
+            room_id,
+            origin
+        )
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        event = yield self.store.get_event(
+            event_id, allow_none=False, check_room_id=room_id
+        )
+
         # Just go through and process each event in `remote_auth_chain`. We
         # don't want to fall into the trap of `missing` being wrong.
         for e in remote_auth_chain:
@@ -1717,7 +1668,6 @@ class FederationHandler(BaseHandler):
                 pass
 
         # Now get the current auth_chain for the event.
-        event = yield self.store.get_event(event_id)
         local_auth_chain = yield self.store.get_auth_chain(
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
@@ -1730,15 +1680,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
@@ -1814,7 +1755,7 @@ class FederationHandler(BaseHandler):
             logger.info("Missing auth: %s", missing_auth)
             # If we don't have all the auth events, we need to get them.
             try:
-                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                remote_auth_chain = yield self.federation_client.get_event_auth(
                     origin, event.room_id, event.event_id
                 )
 
@@ -1919,9 +1860,10 @@ class FederationHandler(BaseHandler):
                         break
 
             if do_resolution:
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.prev_state_ids
+                    event, prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(
                     auth_ids, include_given=True
@@ -1929,7 +1871,7 @@ class FederationHandler(BaseHandler):
 
                 try:
                     # 2. Get remote difference.
-                    result = yield self.replication_layer.query_auth(
+                    result = yield self.federation_client.query_auth(
                         origin,
                         event.room_id,
                         event.event_id,
@@ -2011,21 +1953,34 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
             if k != event_key
         }
-        context.current_state_ids = dict(context.current_state_ids)
-        context.current_state_ids.update(state_updates)
-        if context.delta_ids is not None:
-            context.delta_ids = dict(context.delta_ids)
-            context.delta_ids.update(state_updates)
-        context.prev_state_ids = dict(context.prev_state_ids)
-        context.prev_state_ids.update({
+        current_state_ids = yield context.get_current_state_ids(self.store)
+        current_state_ids = dict(current_state_ids)
+
+        current_state_ids.update(state_updates)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({
             k: a.event_id for k, a in iteritems(auth_events)
         })
-        context.state_group = yield self.store.store_state_group(
+
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
+        state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
-            prev_group=context.prev_group,
-            delta_ids=context.delta_ids,
-            current_state_ids=context.current_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+            current_state_ids=current_state_ids,
+        )
+
+        yield context.update_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
         )
 
     @defer.inlineCallbacks
@@ -2215,7 +2170,7 @@ class FederationHandler(BaseHandler):
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
-            yield self.replication_layer.forward_third_party_invite(
+            yield self.federation_client.forward_third_party_invite(
                 destinations,
                 room_id,
                 event_dict,
@@ -2265,7 +2220,8 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.prev_state_ids.get(key)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -2307,7 +2263,8 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        invite_event_id = prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
@@ -2357,7 +2314,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2368,3 +2325,91 @@ class FederationHandler(BaseHandler):
             )
         if "valid" not in response or not response["valid"]:
             raise AuthError(403, "Third party certificate was invalid")
+
+    @defer.inlineCallbacks
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
+        """Persists events and tells the notifier/pushers about them, if
+        necessary.
+
+        Args:
+            event_and_contexts(list[tuple[FrozenEvent, EventContext]])
+            backfilled (bool): Whether these events are a result of
+                backfilling or not
+
+        Returns:
+            Deferred
+        """
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
+
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    self._notify_persisted_event(event, max_stream_id)
+
+    def _notify_persisted_event(self, event, max_stream_id):
+        """Checks to see if notifier/pushers should be notified about the
+        event or not.
+
+        Args:
+            event (FrozenEvent)
+            max_stream_id (int): The max_stream_id returned by persist_events
+        """
+
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+
+            # We notify for memberships if its an invite for one of our
+            # users
+            if event.internal_metadata.is_outlier():
+                if event.membership != Membership.INVITE:
+                    if not self.is_mine_id(target_user_id):
+                        return
+
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+        elif event.internal_metadata.is_outlier():
+            return
+
+        event_stream_id = event.internal_metadata.stream_ordering
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=extra_users
+        )
+
+        self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
+        )
+
+    def _clean_room_for_join(self, room_id):
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
+
+    def user_joined_room(self, user, room_id):
+        """Called when a new user has joined the room
+        """
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 8c8aedb2b8..5feb3f22a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
 from synapse.api.errors import (
     CodeMessageException,
     Codes,
-    MatrixCodeMessageException,
+    HttpResponseException,
     SynapseError,
 )
 
@@ -85,7 +85,6 @@ class IdentityHandler(BaseHandler):
             )
             defer.returnValue(None)
 
-        data = {}
         try:
             data = yield self.http_client.get_json(
                 "https://%s%s" % (
@@ -94,11 +93,9 @@ class IdentityHandler(BaseHandler):
                 ),
                 {'sid': creds['sid'], 'client_secret': client_secret}
             )
-        except MatrixCodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("getValidated3pid failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
+            raise e.to_synapse_error()
 
         if 'medium' in data:
             defer.returnValue(data)
@@ -136,19 +133,23 @@ class IdentityHandler(BaseHandler):
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
         except CodeMessageException as e:
-            data = json.loads(e.msg)
+            data = json.loads(e.msg)  # XXX WAT?
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def unbind_threepid(self, mxid, threepid):
-        """
-        Removes a binding from an identity server
+    def try_unbind_threepid(self, mxid, threepid):
+        """Removes a binding from an identity server
+
         Args:
             mxid (str): Matrix user ID of binding to be removed
             threepid (dict): Dict with medium & address of binding to be removed
 
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
         Returns:
-            Deferred[bool]: True on success, otherwise False
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
         """
         logger.debug("unbinding threepid %r from %s", threepid, mxid)
         if not self.trusted_id_servers:
@@ -178,11 +179,21 @@ class IdentityHandler(BaseHandler):
             content=content,
             destination_is=id_server,
         )
-        yield self.http_client.post_json_get_json(
-            url,
-            content,
-            headers,
-        )
+        try:
+            yield self.http_client.post_json_get_json(
+                url,
+                content,
+                headers,
+            )
+        except HttpResponseException as e:
+            if e.code in (400, 404, 501,):
+                # The remote server probably doesn't support unbinding (yet)
+                logger.warn("Received %d response while unbinding threepid", e.code)
+                defer.returnValue(False)
+            else:
+                logger.error("Failed to unbind threepid on identity server: %s", e)
+                raise SynapseError(502, "Failed to contact identity server")
+
         defer.returnValue(True)
 
     @defer.inlineCallbacks
@@ -209,12 +220,9 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
-        except MatrixCodeMessageException as e:
-            logger.info("Proxied requestToken failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
-            raise e
+            raise e.to_synapse_error()
 
     @defer.inlineCallbacks
     def requestMsisdnToken(
@@ -244,9 +252,6 @@ class IdentityHandler(BaseHandler):
                 params
             )
             defer.returnValue(data)
-        except MatrixCodeMessageException as e:
-            logger.info("Proxied requestToken failed with Matrix error: %r", e)
-            raise SynapseError(e.code, e.msg, e.errcode)
-        except CodeMessageException as e:
+        except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
-            raise e
+            raise e.to_synapse_error()
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..e009395207 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -25,7 +25,7 @@ from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import StreamToken, UserID
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.visibility import filter_events_for_client
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
             try:
                 if event.membership == Membership.JOIN:
                     room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
+                    deferred_room_state = run_in_background(
+                        self.state_handler.get_current_state,
+                        event.room_id,
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
+                    deferred_room_state = run_in_background(
+                        self.store.get_state_for_events,
+                        [event.event_id], None,
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -370,6 +372,10 @@ class InitialSyncHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_presence():
+            # If presence is disabled, return an empty list
+            if not self.hs.config.use_presence:
+                defer.returnValue([])
+
             states = yield presence_handler.get_states(
                 [m.user_id for m in room_members],
                 as_event=True,
@@ -387,19 +393,21 @@ class InitialSyncHandler(BaseHandler):
                 receipts = []
             defer.returnValue(receipts)
 
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                run_in_background(get_presence),
-                run_in_background(get_receipts),
-                run_in_background(
-                    self.store.get_recent_events_for_room,
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        presence, receipts, (messages, token) = yield make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_presence),
+                    run_in_background(get_receipts),
+                    run_in_background(
+                        self.store.get_recent_events_for_room,
+                        room_id,
+                        limit=limit,
+                        end_token=now_token.room_key,
+                    )
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError),
+        )
 
         messages = yield filter_events_for_client(
             self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..e484061cc0 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,21 +23,25 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
+)
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.types import RoomAlias, UserID
+from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
@@ -45,234 +49,15 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +71,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -304,53 +89,85 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, types=None, filtered_types=None,
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
+            )
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
             )
-            room_state = room_state[membership_event_id]
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, types, filtered_types=filtered_types,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], types, filtered_types=filtered_types,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
@@ -373,7 +190,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
@@ -418,7 +235,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -427,7 +244,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -459,10 +276,14 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
-
+        Raises:
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
+        yield self.auth.check_auth_blocking(requester.user.to_string())
+
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
@@ -630,7 +451,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +574,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -805,11 +627,9 @@ class EventCreationHandler(object):
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    self.hs.get_clock(),
-                    self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
+                    store=self.store,
                     requester=requester,
                     event=event,
                     context=context,
@@ -884,9 +704,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -922,8 +744,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -943,21 +766,20 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
         )
 
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id
+        self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
         )
 
         def _notify():
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..5170d093e3
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,298 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async_helpers import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
+class PaginationHandler(object):
+    """Handles pagination and purge history requests.
+
+    These are in the same handler due to the fact we need to block clients
+    paginating during a purge.
+    """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
+
+    def start_purge_history(self, room_id, token,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, token, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, token,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, token, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, requester, room_id=None, pagin_config=None,
+                     as_client_event=True, event_filter=None):
+        """Get messages in a room.
+
+        Args:
+            requester (Requester): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+            event_filter (Filter): Filter to apply to results or None
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        with (yield self.pagination_lock.read(room_id)):
+            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+                room_id, user_id
+            )
+
+            if source_config.direction == 'b':
+                # if we're going backwards, we might need to backfill. This
+                # requires that we have a topo token.
+                if room_token.topological:
+                    max_topo = room_token.topological
+                else:
+                    max_topo = yield self.store.get_max_topological_token(
+                        room_id, room_token.stream
+                    )
+
+                if membership == Membership.LEAVE:
+                    # If they have left the room then clamp the token to be before
+                    # they left the room, to save the effort of loading from the
+                    # database.
+                    leave_token = yield self.store.get_topological_token_for_event(
+                        member_event_id
+                    )
+                    leave_token = RoomStreamToken.parse(leave_token)
+                    if leave_token.topological < max_topo:
+                        source_config.from_key = str(leave_token)
+
+                yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                    room_id, max_topo
+                )
+
+            events, next_key = yield self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=source_config.from_key,
+                to_key=source_config.to_key,
+                direction=source_config.direction,
+                limit=source_config.limit,
+                event_filter=event_filter,
+            )
+
+            next_token = pagin_config.from_token.copy_and_replace(
+                "room_key", next_key
+            )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        if event_filter:
+            events = event_filter.filter(events)
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        state = None
+        if event_filter and event_filter.lazy_load_members():
+            # TODO: remove redundant members
+
+            types = [
+                (EventTypes.Member, state_key)
+                for state_key in set(
+                    event.sender  # FIXME: we also care about invite targets etc.
+                    for event in events
+                )
+            ]
+
+            state_ids = yield self.store.get_state_ids_for_event(
+                events[0].event_id, types=types,
+            )
+
+            if state_ids:
+                state = yield self.store.get_events(list(state_ids.values()))
+
+            if state:
+                state = yield filter_events_for_client(
+                    self.store,
+                    user_id,
+                    state.values(),
+                    is_peeking=(member_event_id is None),
+                )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now, as_client_event)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        if state:
+            chunk["state"] = [
+                serialize_event(e, time_now, as_client_event)
+                for e in state
+            ]
+
+        defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3732830194..ba3856674d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -36,7 +36,7 @@ from synapse.api.errors import SynapseError
 from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, get_domain_from_id
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
@@ -95,6 +95,7 @@ class PresenceHandler(object):
         Args:
             hs (synapse.server.HomeServer):
         """
+        self.hs = hs
         self.is_mine = hs.is_mine
         self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
@@ -230,6 +231,10 @@ class PresenceHandler(object):
         earlier than they should when synapse is restarted. This affect of this
         is some spurious presence changes that will self-correct.
         """
+        # If the DB pool has already terminated, don't try updating
+        if not self.hs.get_db_pool().running:
+            return
+
         logger.info(
             "Performing _on_shutdown. Persisting %d unpersisted changes",
             len(self.user_to_current_state)
@@ -390,6 +395,10 @@ class PresenceHandler(object):
         """We've seen the user do something that indicates they're interacting
         with the app.
         """
+        # If presence is disabled, no-op
+        if not self.hs.config.use_presence:
+            return
+
         user_id = user.to_string()
 
         bump_active_time_counter.inc()
@@ -419,6 +428,11 @@ class PresenceHandler(object):
                 Useful for streams that are not associated with an actual
                 client that is being used by a user.
         """
+        # Override if it should affect the user's presence, if presence is
+        # disabled.
+        if not self.hs.config.use_presence:
+            affect_presence = False
+
         if affect_presence:
             curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
             self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -464,13 +478,16 @@ class PresenceHandler(object):
         Returns:
             set(str): A set of user_id strings.
         """
-        syncing_user_ids = {
-            user_id for user_id, count in self.user_to_num_current_syncs.items()
-            if count
-        }
-        for user_ids in self.external_process_to_current_syncs.values():
-            syncing_user_ids.update(user_ids)
-        return syncing_user_ids
+        if self.hs.config.use_presence:
+            syncing_user_ids = {
+                user_id for user_id, count in self.user_to_num_current_syncs.items()
+                if count
+            }
+            for user_ids in self.external_process_to_current_syncs.values():
+                syncing_user_ids.update(user_ids)
+            return syncing_user_ids
+        else:
+            return set()
 
     @defer.inlineCallbacks
     def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 3987af85d8..c3506b0db0 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -20,7 +20,14 @@ from signedjson.sign import sign_json
 
 from twisted.internet import defer, reactor
 
-from synapse.api.errors import AuthError, CodeMessageException, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    CodeMessageException,
+    Codes,
+    StoreError,
+    SynapseError,
+)
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.logcontext import run_in_background
 
@@ -49,7 +56,7 @@ class ProfileHandler(BaseHandler):
 
         if hs.config.worker_app is None:
             self.clock.looping_call(
-                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
             )
 
             if len(self.hs.config.replicate_user_profiles_to) > 0:
@@ -127,12 +134,17 @@ class ProfileHandler(BaseHandler):
     def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue({
                 "displayname": displayname,
@@ -152,7 +164,6 @@ class ProfileHandler(BaseHandler):
             except CodeMessageException as e:
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
-
                 raise
 
     @defer.inlineCallbacks
@@ -163,12 +174,17 @@ class ProfileHandler(BaseHandler):
         """
         target_user = UserID.from_string(user_id)
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue({
                 "displayname": displayname,
@@ -181,9 +197,14 @@ class ProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_displayname(self, target_user):
         if self.hs.is_mine(target_user):
-            displayname = yield self.store.get_profile_displayname(
-                target_user.localpart
-            )
+            try:
+                displayname = yield self.store.get_profile_displayname(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
 
             defer.returnValue(displayname)
         else:
@@ -200,7 +221,6 @@ class ProfileHandler(BaseHandler):
             except CodeMessageException as e:
                 if e.code != 404:
                     logger.exception("Failed to get displayname")
-
                 raise
             except Exception:
                 logger.exception("Failed to get displayname")
@@ -271,10 +291,14 @@ class ProfileHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
         if self.hs.is_mine(target_user):
-            avatar_url = yield self.store.get_profile_avatar_url(
-                target_user.localpart
-            )
-
+            try:
+                avatar_url = yield self.store.get_profile_avatar_url(
+                    target_user.localpart
+                )
+            except StoreError as e:
+                if e.code == 404:
+                    raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+                raise
             defer.returnValue(avatar_url)
         else:
             try:
@@ -341,16 +365,20 @@ class ProfileHandler(BaseHandler):
         just_field = args.get("field", None)
 
         response = {}
+        try:
+            if just_field is None or just_field == "displayname":
+                response["displayname"] = yield self.store.get_profile_displayname(
+                    user.localpart
+                )
 
-        if just_field is None or just_field == "displayname":
-            response["displayname"] = yield self.store.get_profile_displayname(
-                user.localpart
-            )
-
-        if just_field is None or just_field == "avatar_url":
-            response["avatar_url"] = yield self.store.get_profile_avatar_url(
-                user.localpart
-            )
+            if just_field is None or just_field == "avatar_url":
+                response["avatar_url"] = yield self.store.get_profile_avatar_url(
+                    user.localpart
+                )
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
+            raise
 
         defer.returnValue(response)
 
@@ -383,6 +411,12 @@ class ProfileHandler(BaseHandler):
                     room_id, str(e.message)
                 )
 
+    def _start_update_remote_profile_cache(self):
+        return run_as_background_process(
+            "Update remote profile", self._update_remote_profile_cache,
+        )
+
+    @defer.inlineCallbacks
     def _update_remote_profile_cache(self):
         """Called periodically to check profiles of remote users we haven't
         checked in a while.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 995460f82a..32108568c6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -17,7 +17,7 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index cb905a3903..a6f3181f09 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
 
 from synapse.types import get_domain_from_id
 from synapse.util import logcontext
-from synapse.util.logcontext import PreserveLoggingContext
 
 from ._base import BaseHandler
 
@@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
 
         affected_room_ids = list(set([r["room_id"] for r in receipts]))
 
-        with PreserveLoggingContext():
-            self.notifier.on_new_event(
-                "receipt_key", max_batch_id, rooms=affected_room_ids
-            )
-            # Note that the min here shouldn't be relied upon to be accurate.
-            self.hs.get_pusherpool().on_new_receipts(
-                min_batch_id, max_batch_id, affected_room_ids
-            )
+        self.notifier.on_new_event(
+            "receipt_key", max_batch_id, rooms=affected_room_ids
+        )
+        # Note that the min here shouldn't be relied upon to be accurate.
+        self.hs.get_pusherpool().on_new_receipts(
+            min_batch_id, max_batch_id, affected_room_ids,
+        )
 
-            defer.returnValue(True)
+        defer.returnValue(True)
 
     @logcontext.preserve_fn   # caller should not yield on this
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 8e9a82166f..3e061c89dc 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -28,7 +28,7 @@ from synapse.api.errors import (
 )
 from synapse.http.client import CaptchaServerHttpClient
 from synapse.types import RoomAlias, RoomID, UserID, create_requester
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import BaseHandler
@@ -45,7 +45,7 @@ class RegistrationHandler(BaseHandler):
             hs (synapse.server.HomeServer):
         """
         super(RegistrationHandler, self).__init__(hs)
-
+        self.hs = hs
         self.auth = hs.get_auth()
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
@@ -132,7 +132,7 @@ class RegistrationHandler(BaseHandler):
         Args:
             localpart : The local part of the user ID to register. If None,
               one will be generated.
-            password (str) : The password to assign to this user so they can
+            password (unicode) : The password to assign to this user so they can
               login again. This can be None which means they cannot login again
               via a password (e.g. the user is an application service user).
             generate_token (bool): Whether a new access token should be
@@ -146,6 +146,8 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
+
+        yield self.auth.check_auth_blocking()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -305,6 +307,7 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID can only contain characters a-z, 0-9, or '=_-./'",
             )
+        yield self.auth.check_auth_blocking()
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
@@ -459,7 +462,7 @@ class RegistrationHandler(BaseHandler):
         """
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
-
+        yield self.auth.check_auth_blocking()
         need_register = True
 
         try:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 78444efad2..621b91d2ae 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,16 +15,25 @@
 # limitations under the License.
 
 """Contains functions for performing events on rooms."""
+import itertools
 import logging
 import math
 import string
 from collections import OrderedDict
 
+from six import string_types
+
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+    DEFAULT_ROOM_VERSION,
+    KNOWN_ROOM_VERSIONS,
+    EventTypes,
+    JoinRules,
+    RoomCreationPreset,
+)
 from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
-from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
@@ -91,15 +100,34 @@ class RoomCreationHandler(BaseHandler):
         Raises:
             SynapseError if the room ID couldn't be stored, or something went
             horribly wrong.
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         """
         user_id = requester.user.to_string()
 
+        self.auth.check_auth_blocking(user_id)
+
         if not self.spam_checker.user_may_create_room(user_id):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         if ratelimit:
             yield self.ratelimit(requester)
 
+        room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+        if not isinstance(room_version, string_types):
+            raise SynapseError(
+                400,
+                "room_version must be a string",
+                Codes.BAD_JSON,
+            )
+
+        if room_version not in KNOWN_ROOM_VERSIONS:
+            raise SynapseError(
+                400,
+                "Your homeserver does not support this room version",
+                Codes.UNSUPPORTED_ROOM_VERSION,
+            )
+
         if "room_alias_name" in config:
             for wchar in string.whitespace:
                 if wchar in config["room_alias_name"]:
@@ -185,6 +213,9 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
+        # override any attempt to set room versions via the creation_content
+        creation_content["room_version"] = room_version
+
         room_member_handler = self.hs.get_room_member_handler()
 
         yield self._send_events_for_new_room(
@@ -406,9 +437,13 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
     @defer.inlineCallbacks
-    def get_event_context(self, user, room_id, event_id, limit):
+    def get_event_context(self, user, room_id, event_id, limit, event_filter):
         """Retrieves events, pagination tokens and state around a given event
         in a room.
 
@@ -418,6 +453,8 @@ class RoomContextHandler(BaseHandler):
             event_id (str)
             limit (int): The maximum number of events to return in total
                 (excluding state).
+            event_filter (Filter|None): the filter to apply to the events returned
+                (excluding the target event_id)
 
         Returns:
             dict, or None if the event isn't found
@@ -425,8 +462,6 @@ class RoomContextHandler(BaseHandler):
         before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
         users = yield self.store.get_users_in_room(room_id)
         is_peeking = user.to_string() not in users
 
@@ -452,7 +487,7 @@ class RoomContextHandler(BaseHandler):
             )
 
         results = yield self.store.get_events_around(
-            room_id, event_id, before_limit, after_limit
+            room_id, event_id, before_limit, after_limit, event_filter
         )
 
         results["events_before"] = yield filter_evts(results["events_before"])
@@ -464,16 +499,35 @@ class RoomContextHandler(BaseHandler):
         else:
             last_event_id = event_id
 
+        types = None
+        filtered_types = None
+        if event_filter and event_filter.lazy_load_members():
+            members = set(ev.sender for ev in itertools.chain(
+                results["events_before"],
+                (results["event"],),
+                results["events_after"],
+            ))
+            filtered_types = [EventTypes.Member]
+            types = [(EventTypes.Member, member) for member in members]
+
+        # XXX: why do we return the state as of the last event rather than the
+        # first? Shouldn't we be consistent with /sync?
+        # https://github.com/matrix-org/matrix-doc/issues/687
+
         state = yield self.store.get_state_for_events(
-            [last_event_id], None
+            [last_event_id], types, filtered_types=filtered_types,
         )
         results["state"] = list(state[last_event_id].values())
 
-        results["start"] = now_token.copy_and_replace(
+        # We use a dummy token here as we only care about the room portion of
+        # the token, which we replace.
+        token = StreamToken.START
+
+        results["start"] = token.copy_and_replace(
             "room_key", results["start"]
         ).to_string()
 
-        results["end"] = now_token.copy_and_replace(
+        results["end"] = token.copy_and_replace(
             "room_key", results["end"]
         ).to_string()
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 828229f5c3..37e41afd61 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -26,7 +26,7 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..fb94b5d7d4 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -30,7 +30,7 @@ import synapse.types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.types import RoomID, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
         if prev_event is not None:
             return
 
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+                guest_can_join = yield self._can_guest_join(prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
@@ -705,6 +708,10 @@ class RoomMemberHandler(object):
             inviter_display_name = member_event.content.get("displayname", "")
             inviter_avatar_url = member_event.content.get("avatar_url", "")
 
+        # if user has no display name, default to their MXID
+        if not inviter_display_name:
+            inviter_display_name = user.to_string()
+
         canonical_room_alias = ""
         canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
         if canonical_alias_event:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.handlers.room_member import RoomMemberHandler
 from synapse.replication.http.membership import (
-    get_or_register_3pid_guest,
-    notify_user_membership_change,
-    remote_join,
-    remote_reject_invite,
+    ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+    ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+    ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+    ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
 )
 
 logger = logging.getLogger(__name__)
 
 
 class RoomMemberWorkerHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberWorkerHandler, self).__init__(hs)
+
+        self._get_register_3pid_client = Repl3PID.make_client(hs)
+        self._remote_join_client = ReplRemoteJoin.make_client(hs)
+        self._remote_reject_client = ReplRejectInvite.make_client(hs)
+        self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        ret = yield remote_join(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        ret = yield self._remote_join_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Implements RoomMemberHandler._remote_reject_invite
         """
-        return remote_reject_invite(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._remote_reject_client(
             requester=requester,
             remote_room_hosts=remote_room_hosts,
             room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_joined_room(self, target, room_id):
         """Implements RoomMemberHandler._user_joined_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def _user_left_room(self, target, room_id):
         """Implements RoomMemberHandler._user_left_room
         """
-        return notify_user_membership_change(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._notify_change_client(
             user_id=target.to_string(),
             room_id=room_id,
             change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
     def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
         """Implements RoomMemberHandler.get_or_register_3pid_guest
         """
-        return get_or_register_3pid_guest(
-            self.simple_http_client,
-            host=self.config.worker_replication_host,
-            port=self.config.worker_replication_http_port,
+        return self._get_register_3pid_client(
             requester=requester,
             medium=medium,
             address=address,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
             contexts = {}
             for event in allowed_events:
                 res = yield self.store.get_events_around(
-                    event.room_id, event.event_id, before_limit, after_limit
+                    event.room_id, event.event_id, before_limit, after_limit,
                 )
 
                 logger.info(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 82c5fee759..113d711d4c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,7 +25,9 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.types import RoomStreamToken
-from synapse.util.async import concurrently_execute
+from synapse.util.async_helpers import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
@@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
@@ -64,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
     "ephemeral",
     "account_data",
     "unread_notifications",
+    "summary",
 ])):
     __slots__ = []
 
@@ -173,6 +185,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
 class SyncHandler(object):
 
     def __init__(self, hs):
+        self.hs_config = hs.config
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
@@ -180,20 +193,35 @@ class SyncHandler(object):
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
         Returns:
-            A Deferred SyncResult.
+            Deferred[SyncResult]
         """
-        return self.response_cache.wrap(
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        user_id = sync_config.user.to_string()
+        yield self.auth.check_auth_blocking(user_id)
+
+        res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config, since_token, timeout, full_state,
         )
+        defer.returnValue(res)
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
@@ -416,29 +444,44 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None, filtered_types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(
+            event.event_id, types, filtered_types=filtered_types,
+        )
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +496,9 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(
+                last_event, types, filtered_types=filtered_types,
+            )
 
         else:
             # no events in this room - so presumably no state
@@ -461,9 +506,141 @@ class SyncHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
+    def compute_summary(self, room_id, sync_config, batch, state, now_token):
+        """ Works out a room summary block for this room, summarising the number
+        of joined members in the room, and providing the 'hero' members if the
+        room has no name so clients can consistently name rooms.  Also adds
+        state events to 'state' if needed to describe the heroes.
+
+        Args:
+            room_id(str):
+            sync_config(synapse.handlers.sync.SyncConfig):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            state(dict): dict of (type, state_key) -> Event as returned by
+                compute_state_delta
+            now_token(str): Token of the end of the current batch.
+
+        Returns:
+             A deferred dict describing the room summary
+        """
+
+        # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
+        last_events, _ = yield self.store.get_recent_event_ids_for_room(
+            room_id, end_token=now_token.room_key, limit=1,
+        )
+
+        if not last_events:
+            defer.returnValue(None)
+            return
+
+        last_event = last_events[-1]
+        state_ids = yield self.store.get_state_ids_for_event(
+            last_event.event_id, [
+                (EventTypes.Member, None),
+                (EventTypes.Name, ''),
+                (EventTypes.CanonicalAlias, ''),
+            ]
+        )
+
+        member_ids = {
+            state_key: event_id
+            for (t, state_key), event_id in state_ids.iteritems()
+            if t == EventTypes.Member
+        }
+        name_id = state_ids.get((EventTypes.Name, ''))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+
+        summary = {}
+
+        # FIXME: it feels very heavy to load up every single membership event
+        # just to calculate the counts.
+        member_events = yield self.store.get_events(member_ids.values())
+
+        joined_user_ids = []
+        invited_user_ids = []
+
+        for ev in member_events.values():
+            if ev.content.get("membership") == Membership.JOIN:
+                joined_user_ids.append(ev.state_key)
+            elif ev.content.get("membership") == Membership.INVITE:
+                invited_user_ids.append(ev.state_key)
+
+        # TODO: only send these when they change.
+        summary["m.joined_member_count"] = len(joined_user_ids)
+        summary["m.invited_member_count"] = len(invited_user_ids)
+
+        if name_id or canonical_alias_id:
+            defer.returnValue(summary)
+
+        # FIXME: order by stream ordering, not alphabetic
+
+        me = sync_config.user.to_string()
+        if (joined_user_ids or invited_user_ids):
+            summary['m.heroes'] = sorted(
+                [
+                    user_id
+                    for user_id in (joined_user_ids + invited_user_ids)
+                    if user_id != me
+                ]
+            )[0:5]
+        else:
+            summary['m.heroes'] = sorted(
+                [user_id for user_id in member_ids.keys() if user_id != me]
+            )[0:5]
+
+        if not sync_config.filter_collection.lazy_load_members():
+            defer.returnValue(summary)
+
+        # ensure we send membership events for heroes if needed
+        cache_key = (sync_config.user.to_string(), sync_config.device_id)
+        cache = self.get_lazy_loaded_members_cache(cache_key)
+
+        # track which members the client should already know about via LL:
+        # Ones which are already in state...
+        existing_members = set(
+            user_id for (typ, user_id) in state.keys()
+            if typ == EventTypes.Member
+        )
+
+        # ...or ones which are in the timeline...
+        for ev in batch.events:
+            if ev.type == EventTypes.Member:
+                existing_members.add(ev.state_key)
+
+        # ...and then ensure any missing ones get included in state.
+        missing_hero_event_ids = [
+            member_ids[hero_id]
+            for hero_id in summary['m.heroes']
+            if (
+                cache.get(hero_id) != member_ids[hero_id] and
+                hero_id not in existing_members
+            )
+        ]
+
+        missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
+        missing_hero_state = missing_hero_state.values()
+
+        for s in missing_hero_state:
+            cache.set(s.state_key, s.event_id)
+            state[(EventTypes.Member, s.state_key)] = s
+
+        defer.returnValue(summary)
+
+    def get_lazy_loaded_members_cache(self, cache_key):
+        cache = self.lazy_loaded_members_cache.get(cache_key)
+        if cache is None:
+            logger.debug("creating LruCache for %r", cache_key)
+            cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+            self.lazy_loaded_members_cache[cache_key] = cache
+        else:
+            logger.debug("found LruCache for %r", cache_key)
+        return cache
+
+    @defer.inlineCallbacks
     def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
                             full_state):
-        """ Works out the differnce in state between the start of the timeline
+        """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
         Args:
@@ -477,7 +654,7 @@ class SyncHandler(object):
             full_state(bool): Whether to force returning the full state.
 
         Returns:
-             A deferred new event dictionary
+             A deferred dict of (type, state_key) -> Event
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -485,59 +662,123 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            filtered_types = None
+
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # only apply the filtering to room members
+                filtered_types = [EventTypes.Member]
+
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
                     previous={},
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    if types:
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types,
+                            filtered_types=filtered_types,
+                        )
+
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.get_lazy_loaded_members_cache(cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
 
         state = {}
         if state_ids:
@@ -620,7 +861,7 @@ class SyncHandler(object):
             since_token is None and
             sync_config.filter_collection.blocks_all_presence()
         )
-        if not block_all_presence_data:
+        if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
                 sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
             )
@@ -1318,7 +1559,6 @@ class SyncHandler(object):
             if events == [] and tags is None:
                 return
 
-        since_token = sync_result_builder.since_token
         now_token = sync_result_builder.now_token
         sync_config = sync_result_builder.sync_config
 
@@ -1361,6 +1601,18 @@ class SyncHandler(object):
             full_state=full_state
         )
 
+        summary = {}
+        if (
+            sync_config.filter_collection.lazy_load_members() and
+            (
+                any(ev.type == EventTypes.Member for ev in batch.events) or
+                since_token is None
+            )
+        ):
+            summary = yield self.compute_summary(
+                room_id, sync_config, batch, state, now_token
+            )
+
         if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
@@ -1370,6 +1622,7 @@ class SyncHandler(object):
                 ephemeral=ephemeral,
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
+                summary=summary,
             )
 
             if room_sync or always_include:
@@ -1454,7 +1707,9 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+    timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
     """Works out what state to include in a sync response.
 
     Args:
@@ -1463,6 +1718,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
+        lazy_load_members (bool): whether to return members from timeline_start
+            or not.  assumes that timeline_start has already been filtered to
+            include only the members the client needs to know about.
 
     Returns:
         dict
@@ -1478,9 +1736,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
+
+    # If we are lazyloading room members, we explicitly add the membership events
+    # for the senders in the timeline into the state block returned by /sync,
+    # as we may not have sent them to the client before.  We find these membership
+    # events by filtering them out of timeline_start, which has already been filtered
+    # to only include membership events for the senders in the timeline.
+    # In practice, we can do this by removing them from the p_ids list,
+    # which is the list of relevant state we know we have already sent to the client.
+    # see https://github.com/matrix-org/synapse/pull/2970
+    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+    if lazy_load_members:
+        p_ids.difference_update(
+            e for t, e in timeline_start.iteritems()
+            if t[0] == EventTypes.Member
+        )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids