summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py10
-rw-r--r--synapse/handlers/_base.py6
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py18
-rw-r--r--synapse/handlers/auth.py74
-rw-r--r--synapse/handlers/deactivate_account.py44
-rw-r--r--synapse/handlers/device.py18
-rw-r--r--synapse/handlers/devicemessage.py3
-rw-r--r--synapse/handlers/directory.py9
-rw-r--r--synapse/handlers/e2e_keys.py17
-rw-r--r--synapse/handlers/events.py15
-rw-r--r--synapse/handlers/federation.py383
-rw-r--r--synapse/handlers/groups_local.py7
-rw-r--r--synapse/handlers/identity.py65
-rw-r--r--synapse/handlers/initial_sync.py47
-rw-r--r--synapse/handlers/message.py339
-rw-r--r--synapse/handlers/pagination.py265
-rw-r--r--synapse/handlers/presence.py23
-rw-r--r--synapse/handlers/profile.py12
-rw-r--r--synapse/handlers/read_marker.py5
-rw-r--r--synapse/handlers/receipts.py10
-rw-r--r--synapse/handlers/register.py16
-rw-r--r--synapse/handlers/room.py43
-rw-r--r--synapse/handlers/room_list.py26
-rw-r--r--synapse/handlers/room_member.py23
-rw-r--r--synapse/handlers/room_member_worker.py5
-rw-r--r--synapse/handlers/search.py31
-rw-r--r--synapse/handlers/set_password.py1
-rw-r--r--synapse/handlers/sync.py197
-rw-r--r--synapse/handlers/typing.py11
-rw-r--r--synapse/handlers/user_directory.py16
31 files changed, 980 insertions, 763 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index d358842b3e..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,13 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from .register import RegistrationHandler
-from .room import RoomContextHandler
-from .message import MessageHandler
-from .federation import FederationHandler
-from .directory import DirectoryHandler
 from .admin import AdminHandler
+from .directory import DirectoryHandler
+from .federation import FederationHandler
 from .identity import IdentityHandler
+from .register import RegistrationHandler
 from .search import SearchHandler
 
 
@@ -44,10 +42,8 @@ class Handlers(object):
 
     def __init__(self, hs):
         self.registration_handler = RegistrationHandler(hs)
-        self.message_handler = MessageHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
-        self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 2d1db0c245..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -18,11 +18,10 @@ import logging
 from twisted.internet import defer
 
 import synapse.types
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import LimitExceededError
 from synapse.types import UserID
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -113,8 +112,9 @@ class BaseHandler(object):
             guest_access = event.content.get("guest_access", "forbidden")
             if guest_access != "can_join":
                 if context:
+                    current_state_ids = yield context.get_current_state_ids(self.store)
                     current_state = yield self.store.get_events(
-                        list(context.current_state_ids.values())
+                        list(current_state_ids.values())
                     )
                 else:
                     current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index f36b358b45..5d629126fc 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -13,12 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
 from ._base import BaseHandler
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1c29c43a83..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -13,19 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
 from six import itervalues
 
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
 import synapse
 from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.metrics import Measure
-from synapse.util.logcontext import (
-    make_deferred_yieldable, run_in_background,
-)
-from prometheus_client import Counter
-
-import logging
 
 logger = logging.getLogger(__name__)
 
@@ -107,7 +107,9 @@ class ApplicationServicesHandler(object):
                             yield self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
-                            self.scheduler.start().addErrback(log_failure)
+                            def start_scheduler():
+                                return self.scheduler.start().addErrback(log_failure)
+                            run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
 
                         # Fork off pushes to these services
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 3c0051586d..402e44cdef 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,29 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import logging
+
+import attr
+import bcrypt
+import pymacaroons
+from canonicaljson import json
+
 from twisted.internet import defer, threads
+from twisted.web.client import PartialDownloadError
 
-from ._base import BaseHandler
+import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
-    AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError,
+    AuthError,
+    Codes,
+    InteractiveAuthIncompleteError,
+    LoginError,
+    StoreError,
     SynapseError,
 )
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
-from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logcontext import make_deferred_yieldable
 
-from twisted.web.client import PartialDownloadError
-
-import logging
-import bcrypt
-import pymacaroons
-import simplejson
-
-import synapse.util.stringutils as stringutils
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -402,7 +406,7 @@ class AuthHandler(BaseHandler):
         except PartialDownloadError as pde:
             # Twisted is silly
             data = pde.response
-            resp_body = simplejson.loads(data)
+            resp_body = json.loads(data)
 
         if 'success' in resp_body:
             # Note that we do NOT check the hostname here: we explicitly
@@ -423,15 +427,11 @@ class AuthHandler(BaseHandler):
     def _check_msisdn(self, authdict, _):
         return self._check_threepid('msisdn', authdict)
 
-    @defer.inlineCallbacks
     def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
+        return defer.succeed(True)
 
     @defer.inlineCallbacks
     def _check_threepid(self, medium, authdict):
-        yield run_on_reactor()
-
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -825,6 +825,15 @@ class AuthHandler(BaseHandler):
         if medium == 'email':
             address = address.lower()
 
+        identity_handler = self.hs.get_handlers().identity_handler
+        yield identity_handler.unbind_threepid(
+            user_id,
+            {
+                'medium': medium,
+                'address': address,
+            },
+        )
+
         ret = yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
@@ -849,7 +858,11 @@ class AuthHandler(BaseHandler):
             return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
                                  bcrypt.gensalt(self.bcrypt_rounds))
 
-        return make_deferred_yieldable(threads.deferToThread(_do_hash))
+        return make_deferred_yieldable(
+            threads.deferToThreadPool(
+                self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash
+            ),
+        )
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -869,16 +882,21 @@ class AuthHandler(BaseHandler):
             )
 
         if stored_hash:
-            return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
+            return make_deferred_yieldable(
+                threads.deferToThreadPool(
+                    self.hs.get_reactor(),
+                    self.hs.get_reactor().getThreadPool(),
+                    _do_validate_hash,
+                ),
+            )
         else:
             return defer.succeed(False)
 
 
-class MacaroonGeneartor(object):
-    def __init__(self, hs):
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server_name
-        self.macaroon_secret_key = hs.config.macaroon_secret_key
+@attr.s
+class MacaroonGenerator(object):
+
+    hs = attr.ib()
 
     def generate_access_token(self, user_id, extra_caveats=None):
         extra_caveats = extra_caveats or []
@@ -896,7 +914,7 @@ class MacaroonGeneartor(object):
     def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
-        now = self.clock.time_msec()
+        now = self.hs.get_clock().time_msec()
         expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
@@ -908,9 +926,9 @@ class MacaroonGeneartor(object):
 
     def _generate_base_macaroon(self, user_id):
         macaroon = pymacaroons.Macaroon(
-            location=self.server_name,
+            location=self.hs.config.server_name,
             identifier="key",
-            key=self.macaroon_secret_key)
+            key=self.hs.config.macaroon_secret_key)
         macaroon.add_first_party_caveat("gen = 1")
         macaroon.add_first_party_caveat("user_id = %s" % (user_id,))
         return macaroon
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index c5e92f6214..b3c5a9ee64 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,13 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer, reactor
+import logging
 
-from ._base import BaseHandler
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
 from synapse.types import UserID, create_requester
 from synapse.util.logcontext import run_in_background
 
-import logging
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -30,6 +32,7 @@ class DeactivateAccountHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
+        self._identity_handler = hs.get_handlers().identity_handler
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -37,14 +40,15 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        reactor.callWhenRunning(self._start_user_parting)
+        hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
-    def deactivate_account(self, user_id):
+    def deactivate_account(self, user_id, erase_data):
         """Deactivate a user's account
 
         Args:
             user_id (str): ID of user to be deactivated
+            erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
             Deferred
@@ -52,14 +56,35 @@ class DeactivateAccountHandler(BaseHandler):
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
 
-        # first delete any devices belonging to the user, which will also
+        # delete threepids first. We remove these from the IS so if this fails,
+        # leave the user still active so they can try again.
+        # Ideally we would prevent password resets and then do this in the
+        # background thread.
+        threepids = yield self.store.user_get_threepids(user_id)
+        for threepid in threepids:
+            try:
+                yield self._identity_handler.unbind_threepid(
+                    user_id,
+                    {
+                        'medium': threepid['medium'],
+                        'address': threepid['address'],
+                    },
+                )
+            except Exception:
+                # Do we want this to be a fatal error or should we carry on?
+                logger.exception("Failed to remove threepid from ID server")
+                raise SynapseError(400, "Failed to remove threepid from ID server")
+            yield self.store.user_delete_threepid(
+                user_id, threepid['medium'], threepid['address'],
+            )
+
+        # delete any devices belonging to the user, which will also
         # delete corresponding access tokens.
         yield self._device_handler.delete_all_devices_for_user(user_id)
         # then delete any remaining access tokens which weren't associated with
         # a device.
         yield self._auth_handler.delete_access_tokens_for_user(user_id)
 
-        yield self.store.user_delete_threepids(user_id)
         yield self.store.user_set_password_hash(user_id, None)
 
         # Add the user to a table of users pending deactivation (ie.
@@ -69,6 +94,11 @@ class DeactivateAccountHandler(BaseHandler):
         # delete from user directory
         yield self.user_directory_handler.handle_user_deactivated(user_id)
 
+        # Mark the user as erased, if they asked for that
+        if erase_data:
+            logger.info("Marking %s as erased", user_id)
+            yield self.store.mark_user_erased(user_id)
+
         # Now start the process that goes through that list and
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 11c6fb3657..2d44f15da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,21 +12,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
 from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.api.errors import FederationDeniedError
+from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.metrics import measure_func
-from synapse.types import get_domain_from_id, RoomStreamToken
-from twisted.internet import defer
-from ._base import BaseHandler
-
-import logging
+from synapse.util.retryutils import NotRetryingDestination
 
-from six import itervalues, iteritems
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -537,7 +539,7 @@ class DeviceListEduUpdater(object):
                 yield self.device_handler.notify_device_update(user_id, device_ids)
             else:
                 # Simply update the single device, since we know that is the only
-                # change (becuase of the single prev_id matching the current cache)
+                # change (because of the single prev_id matching the current cache)
                 for device_id, stream_id, prev_ids, content in pending_updates:
                     yield self.store.update_remote_device_list_cache_entry(
                         user_id, device_id, content, stream_id,
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f147a20b73..2e2e5261de 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,10 +18,9 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
-from synapse.types import get_domain_from_id, UserID
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c5b6e75e03..ef866da1b6 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -14,15 +14,16 @@
 # limitations under the License.
 
 
+import logging
+import string
+
 from twisted.internet import defer
-from ._base import BaseHandler
 
-from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError
 from synapse.api.constants import EventTypes
+from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
 from synapse.types import RoomAlias, UserID, get_domain_from_id
 
-import logging
-import string
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8a2d177539..5816bf8b4f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -14,17 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import simplejson as json
 import logging
 
-from canonicaljson import encode_canonical_json
-from twisted.internet import defer
 from six import iteritems
 
-from synapse.api.errors import (
-    SynapseError, CodeMessageException, FederationDeniedError,
-)
-from synapse.types import get_domain_from_id, UserID
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -80,7 +79,7 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
-        # Firt get local devices.
+        # First get local devices.
         failures = {}
         results = {}
         if local_query:
@@ -357,7 +356,7 @@ def _exception_to_failure(e):
     # include ConnectionRefused and other errors
     #
     # Note that some Exceptions (notably twisted's ResponseFailed etc) don't
-    # give a string for e.message, which simplejson then fails to serialize.
+    # give a string for e.message, which json then fails to serialize.
     return {
         "status": 503, "message": str(e.message),
     }
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 8bc642675f..c3f2d7feff 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -13,20 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+import random
+
 from twisted.internet import defer
 
-from synapse.util.logutils import log_function
-from synapse.types import UserID
-from synapse.events.utils import serialize_event
-from synapse.api.constants import Membership, EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
+from synapse.events.utils import serialize_event
+from synapse.types import UserID
+from synapse.util.logutils import log_function
 
 from ._base import BaseHandler
 
-import logging
-import random
-
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..49068c06d9 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,41 @@ import itertools
 import logging
 import sys
 
+import six
+from six import iteritems, itervalues
+from six.moves import http_client, zip
+
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
-import six
-from six.moves import http_client
-from six import iteritems
-from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
-from ._base import BaseHandler
+from twisted.internet import defer
 
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
-    AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    AuthError,
+    CodeMessageException,
     FederationDeniedError,
+    FederationError,
+    StoreError,
+    SynapseError,
 )
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
-    compute_event_signature, add_hashes_and_signatures,
+    add_hashes_and_signatures,
+    compute_event_signature,
 )
+from synapse.events.validator import EventValidator
+from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
 from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
 
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -89,7 +93,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_receive_pdu(self, origin, pdu, get_missing=True):
+    def on_receive_pdu(
+            self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+    ):
         """ Process a PDU received via a federation /send/ transaction, or
         via backfill of missing prev_events
 
@@ -103,8 +109,10 @@ class FederationHandler(BaseHandler):
         """
 
         # We reprocess pdus when we have seen them only as outliers
-        existing = yield self.get_persisted_pdu(
-            origin, pdu.event_id, do_auth=False
+        existing = yield self.store.get_event(
+            pdu.event_id,
+            allow_none=True,
+            allow_rejected=True,
         )
 
         # FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +169,11 @@ class FederationHandler(BaseHandler):
                     "Ignoring PDU %s for room %s from %s as we've left the room!",
                     pdu.event_id, pdu.room_id, origin,
                 )
-                return
+                defer.returnValue(None)
 
         state = None
-
         auth_chain = []
 
-        fetch_state = False
-
         # Get missing pdus if necessary.
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
@@ -223,26 +228,60 @@ class FederationHandler(BaseHandler):
                         list(prevs - seen)[:5],
                     )
 
-            if prevs - seen:
-                logger.info(
-                    "Still missing %d events for room %r: %r...",
-                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+            if sent_to_us_directly and prevs - seen:
+                # If they have sent it to us directly, and the server
+                # isn't telling us about the auth events that it's
+                # made a message referencing, we explode
+                raise FederationError(
+                    "ERROR",
+                    403,
+                    (
+                        "Your server isn't divulging details about prev_events "
+                        "referenced in this event."
+                    ),
+                    affected=pdu.event_id,
                 )
-                fetch_state = True
+            elif prevs - seen:
+                # Calculate the state of the previous events, and
+                # de-conflict them to find the current state.
+                state_groups = []
+                auth_chains = set()
+                try:
+                    # Get the state of the events we know about
+                    ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+                    state_groups.append(ours)
+
+                    # Ask the remote server for the states we don't
+                    # know about
+                    for p in prevs - seen:
+                        state, got_auth_chain = (
+                            yield self.replication_layer.get_state_for_room(
+                                origin, pdu.room_id, p
+                            )
+                        )
+                        auth_chains.update(got_auth_chain)
+                        state_group = {(x.type, x.state_key): x.event_id for x in state}
+                        state_groups.append(state_group)
+
+                    # Resolve any conflicting state
+                    def fetch(ev_ids):
+                        return self.store.get_events(
+                            ev_ids, get_prev_content=False, check_redacted=False
+                        )
 
-        if fetch_state:
-            # We need to get the state at this event, since we haven't
-            # processed all the prev events.
-            logger.debug(
-                "_handle_new_pdu getting state for %s",
-                pdu.room_id
-            )
-            try:
-                state, auth_chain = yield self.replication_layer.get_state_for_room(
-                    origin, pdu.room_id, pdu.event_id,
-                )
-            except Exception:
-                logger.exception("Failed to get state for event: %s", pdu.event_id)
+                    state_map = yield resolve_events_with_factory(
+                        state_groups, {pdu.event_id: pdu}, fetch
+                    )
+
+                    state = (yield self.store.get_events(state_map.values())).values()
+                    auth_chain = list(auth_chains)
+                except Exception:
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        "We can't get valid state history.",
+                        affected=pdu.event_id,
+                    )
 
         yield self._process_received_pdu(
             origin,
@@ -320,11 +359,17 @@ class FederationHandler(BaseHandler):
 
         for e in missing_events:
             logger.info("Handling found event %s", e.event_id)
-            yield self.on_receive_pdu(
-                origin,
-                e,
-                get_missing=False
-            )
+            try:
+                yield self.on_receive_pdu(
+                    origin,
+                    e,
+                    get_missing=False
+                )
+            except FederationError as e:
+                if e.code == 403:
+                    logger.warn("Event %s failed history check.")
+                else:
+                    raise
 
     @log_function
     @defer.inlineCallbacks
@@ -441,7 +486,10 @@ class FederationHandler(BaseHandler):
                 # joined the room. Don't bother if the user is just
                 # changing their profile info.
                 newly_joined = True
-                prev_state_id = context.prev_state_ids.get(
+
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+                prev_state_id = prev_state_ids.get(
                     (event.type, event.state_key)
                 )
                 if prev_state_id:
@@ -455,83 +503,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-    @measure_func("_filter_events_for_server")
-    @defer.inlineCallbacks
-    def _filter_events_for_server(self, server_name, room_id, events):
-        event_to_state_ids = yield self.store.get_state_ids_for_events(
-            frozenset(e.event_id for e in events),
-            types=(
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, None),
-            )
-        )
-
-        # We only want to pull out member events that correspond to the
-        # server's domain.
-
-        def check_match(id):
-            try:
-                return server_name == get_domain_from_id(id)
-            except Exception:
-                return False
-
-        # Parses mapping `event_id -> (type, state_key) -> state event_id`
-        # to get all state ids that we're interested in.
-        event_map = yield self.store.get_events([
-            e_id
-            for key_to_eid in list(event_to_state_ids.values())
-            for key, e_id in key_to_eid.items()
-            if key[0] != EventTypes.Member or check_match(key[1])
-        ])
-
-        event_to_state = {
-            e_id: {
-                key: event_map[inner_e_id]
-                for key, inner_e_id in key_to_eid.iteritems()
-                if inner_e_id in event_map
-            }
-            for e_id, key_to_eid in event_to_state_ids.iteritems()
-        }
-
-        def redact_disallowed(event, state):
-            if not state:
-                return event
-
-            history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
-            if history:
-                visibility = history.content.get("history_visibility", "shared")
-                if visibility in ["invited", "joined"]:
-                    # We now loop through all state events looking for
-                    # membership states for the requesting server to determine
-                    # if the server is either in the room or has been invited
-                    # into the room.
-                    for ev in state.itervalues():
-                        if ev.type != EventTypes.Member:
-                            continue
-                        try:
-                            domain = get_domain_from_id(ev.state_key)
-                        except Exception:
-                            continue
-
-                        if domain != server_name:
-                            continue
-
-                        memtype = ev.membership
-                        if memtype == Membership.JOIN:
-                            return event
-                        elif memtype == Membership.INVITE:
-                            if visibility == "invited":
-                                return event
-                    else:
-                        return prune_event(event)
-
-            return event
-
-        defer.returnValue([
-            redact_disallowed(e, event_to_state[e.event_id])
-            for e in events
-        ])
-
     @log_function
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities):
@@ -763,7 +734,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in state.iteritems()
+                for (e_type, state_key), event in iteritems(state)
                 if e_type == EventTypes.Member
                 and event.membership == Membership.JOIN
             ]
@@ -780,7 +751,7 @@ class FederationHandler(BaseHandler):
                 except Exception:
                     pass
 
-            return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+            return sorted(joined_domains.items(), key=lambda d: d[1])
 
         curr_domains = get_domains_from_state(curr_state)
 
@@ -843,7 +814,7 @@ class FederationHandler(BaseHandler):
         tried_domains = set(likely_domains)
         tried_domains.add(self.server_name)
 
-        event_ids = list(extremities.iterkeys())
+        event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
         resolve = logcontext.preserve_fn(
@@ -859,15 +830,15 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = yield self.store.get_events(
-            [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
             get_prev_content=False
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in state_dict.iteritems()
+                for k, e_id in iteritems(state_dict)
                 if e_id in state_map
-            } for key, state_dict in states.iteritems()
+            } for key, state_dict in iteritems(states)
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -938,16 +909,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -1148,10 +1109,12 @@ class FederationHandler(BaseHandler):
                 user = UserID.from_string(event.state_key)
                 yield user_joined_room(self.distributor, user, event.room_id)
 
-        state_ids = list(context.prev_state_ids.values())
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        state_ids = list(prev_state_ids.values())
         auth_chain = yield self.store.get_auth_chain(state_ids)
 
-        state = yield self.store.get_events(list(context.prev_state_ids.values()))
+        state = yield self.store.get_events(list(prev_state_ids.values()))
 
         defer.returnValue({
             "state": list(state.values()),
@@ -1316,7 +1279,7 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_leave_request(self, room_id, user_id):
         """ We've received a /make_leave/ request, so we create a partial
-        join event for the room and return that. We do *not* persist or
+        leave event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
         builder = self.event_builder_factory.new({
@@ -1381,8 +1344,6 @@ class FederationHandler(BaseHandler):
     def get_state_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
@@ -1405,18 +1366,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1425,8 +1374,6 @@ class FederationHandler(BaseHandler):
     def get_state_ids_for_pdu(self, room_id, event_id):
         """Returns the state at the event. i.e. not including said event.
         """
-        yield run_on_reactor()
-
         state_groups = yield self.store.get_state_groups_ids(
             room_id, [event_id]
         )
@@ -1462,17 +1409,26 @@ class FederationHandler(BaseHandler):
             limit
         )
 
-        events = yield self._filter_events_for_server(origin, room_id, events)
+        events = yield filter_events_for_server(self.store, origin, events)
 
         defer.returnValue(events)
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id, do_auth=True):
-        """ Get a PDU from the database with given origin and id.
+    def get_persisted_pdu(self, origin, event_id):
+        """Get an event from the database for the given server.
+
+        Args:
+            origin [str]: hostname of server which is requesting the event; we
+               will check that the server is allowed to see it.
+            event_id [str]: id of the event being requested
 
         Returns:
-            Deferred: Results in a `Pdu`.
+            Deferred[EventBase|None]: None if we know nothing about the event;
+                otherwise the (possibly-redacted) event.
+
+        Raises:
+            AuthError if the server is not currently in the room
         """
         event = yield self.store.get_event(
             event_id,
@@ -1481,32 +1437,17 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
-            if do_auth:
-                in_room = yield self.auth.check_host_in_room(
-                    event.room_id,
-                    origin
-                )
-                if not in_room:
-                    raise AuthError(403, "Host not in room.")
-
-                events = yield self._filter_events_for_server(
-                    origin, event.room_id, [event]
-                )
-
-                event = events[0]
+            in_room = yield self.auth.check_host_in_room(
+                event.room_id,
+                origin
+            )
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
+            events = yield filter_events_for_server(
+                self.store, origin, [event],
+            )
+            event = events[0]
             defer.returnValue(event)
         else:
             defer.returnValue(None)
@@ -1579,7 +1520,7 @@ class FederationHandler(BaseHandler):
         yield self.store.persist_events(
             [
                 (ev_info["event"], context)
-                for ev_info, context in itertools.izip(event_infos, contexts)
+                for ev_info, context in zip(event_infos, contexts)
             ],
             backfilled=backfilled,
         )
@@ -1699,8 +1640,9 @@ class FederationHandler(BaseHandler):
         )
 
         if not auth_events:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -1760,15 +1702,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
@@ -1794,8 +1727,8 @@ class FederationHandler(BaseHandler):
             min_depth=min_depth,
         )
 
-        missing_events = yield self._filter_events_for_server(
-            origin, room_id, missing_events,
+        missing_events = yield filter_events_for_server(
+            self.store, origin, missing_events,
         )
 
         defer.returnValue(missing_events)
@@ -1949,9 +1882,10 @@ class FederationHandler(BaseHandler):
                         break
 
             if do_resolution:
+                prev_state_ids = yield context.get_prev_state_ids(self.store)
                 # 1. Get what we think is the auth chain.
                 auth_ids = yield self.auth.compute_auth_events(
-                    event, context.prev_state_ids
+                    event, prev_state_ids
                 )
                 local_auth_chain = yield self.store.get_auth_chain(
                     auth_ids, include_given=True
@@ -2041,21 +1975,34 @@ class FederationHandler(BaseHandler):
             k: a.event_id for k, a in iteritems(auth_events)
             if k != event_key
         }
-        context.current_state_ids = dict(context.current_state_ids)
-        context.current_state_ids.update(state_updates)
-        if context.delta_ids is not None:
-            context.delta_ids = dict(context.delta_ids)
-            context.delta_ids.update(state_updates)
-        context.prev_state_ids = dict(context.prev_state_ids)
-        context.prev_state_ids.update({
+        current_state_ids = yield context.get_current_state_ids(self.store)
+        current_state_ids = dict(current_state_ids)
+
+        current_state_ids.update(state_updates)
+
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({
             k: a.event_id for k, a in iteritems(auth_events)
         })
-        context.state_group = yield self.store.store_state_group(
+
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
+        state_group = yield self.store.store_state_group(
             event.event_id,
             event.room_id,
-            prev_group=context.prev_group,
-            delta_ids=context.delta_ids,
-            current_state_ids=context.current_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+            current_state_ids=current_state_ids,
+        )
+
+        yield context.update_state(
+            state_group=state_group,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
         )
 
     @defer.inlineCallbacks
@@ -2295,7 +2242,8 @@ class FederationHandler(BaseHandler):
             event.content["third_party_invite"]["signed"]["token"]
         )
         original_invite = None
-        original_invite_id = context.prev_state_ids.get(key)
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = yield self.store.get_event(
                 original_invite_id, allow_none=True
@@ -2337,7 +2285,8 @@ class FederationHandler(BaseHandler):
         signed = event.content["third_party_invite"]["signed"]
         token = signed["token"]
 
-        invite_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        invite_event_id = prev_state_ids.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index dcae083734..53e5e2648b 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -14,14 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+
 from six import iteritems
 
+from twisted.internet import defer
+
 from synapse.api.errors import SynapseError
 from synapse.types import get_domain_from_id
 
-import logging
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 91a0898860..8c8aedb2b8 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,16 +19,18 @@
 
 import logging
 
-import simplejson as json
+from canonicaljson import json
 
 from twisted.internet import defer
 
 from synapse.api.errors import (
-    MatrixCodeMessageException, CodeMessageException
+    CodeMessageException,
+    Codes,
+    MatrixCodeMessageException,
+    SynapseError,
 )
+
 from ._base import BaseHandler
-from synapse.util.async import run_on_reactor
-from synapse.api.errors import SynapseError, Codes
 
 logger = logging.getLogger(__name__)
 
@@ -38,6 +41,7 @@ class IdentityHandler(BaseHandler):
         super(IdentityHandler, self).__init__(hs)
 
         self.http_client = hs.get_simple_http_client()
+        self.federation_http_client = hs.get_http_client()
 
         self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
         self.trust_any_id_server_just_for_testing_do_not_use = (
@@ -60,8 +64,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
-        yield run_on_reactor()
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -104,7 +106,6 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_threepid(self, creds, mxid):
-        yield run_on_reactor()
         logger.debug("binding threepid %r to %s", creds, mxid)
         data = None
 
@@ -139,9 +140,53 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
-        yield run_on_reactor()
+    def unbind_threepid(self, mxid, threepid):
+        """
+        Removes a binding from an identity server
+        Args:
+            mxid (str): Matrix user ID of binding to be removed
+            threepid (dict): Dict with medium & address of binding to be removed
+
+        Returns:
+            Deferred[bool]: True on success, otherwise False
+        """
+        logger.debug("unbinding threepid %r from %s", threepid, mxid)
+        if not self.trusted_id_servers:
+            logger.warn("Can't unbind threepid: no trusted ID servers set in config")
+            defer.returnValue(False)
+
+        # We don't track what ID server we added 3pids on (perhaps we ought to)
+        # but we assume that any of the servers in the trusted list are in the
+        # same ID server federation, so we can pick any one of them to send the
+        # deletion request to.
+        id_server = next(iter(self.trusted_id_servers))
+
+        url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        content = {
+            "mxid": mxid,
+            "threepid": threepid,
+        }
+        headers = {}
+        # we abuse the federation http client to sign the request, but we have to send it
+        # using the normal http client since we don't want the SRV lookup and want normal
+        # 'browser-like' HTTPS.
+        self.federation_http_client.sign_request(
+            destination=None,
+            method='POST',
+            url_bytes='/_matrix/identity/api/v1/3pid/unbind'.encode('ascii'),
+            headers_dict=headers,
+            content=content,
+            destination_is=id_server,
+        )
+        yield self.http_client.post_json_get_json(
+            url,
+            content,
+            headers,
+        )
+        defer.returnValue(True)
 
+    @defer.inlineCallbacks
+    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -176,8 +221,6 @@ class IdentityHandler(BaseHandler):
             self, id_server, country, phone_number,
             client_secret, send_attempt, **kwargs
     ):
-        yield run_on_reactor()
-
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 71af86fe21..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -21,9 +23,7 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
-from synapse.types import (
-    UserID, StreamToken,
-)
+from synapse.types import StreamToken, UserID
 from synapse.util import unwrapFirstError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.snapshot_cache import SnapshotCache
@@ -32,9 +32,6 @@ from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
-import logging
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -151,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
             try:
                 if event.membership == Membership.JOIN:
                     room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
+                    deferred_room_state = run_in_background(
+                        self.state_handler.get_current_state,
+                        event.room_id,
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
+                    deferred_room_state = run_in_background(
+                        self.store.get_state_for_events,
+                        [event.event_id], None,
                     )
                     deferred_room_state.addCallback(
                         lambda states: states[event.event_id]
@@ -390,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
                 receipts = []
             defer.returnValue(receipts)
 
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                run_in_background(get_presence),
-                run_in_background(get_receipts),
-                run_in_background(
-                    self.store.get_recent_events_for_room,
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
+        presence, receipts, (messages, token) = yield make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(get_presence),
+                    run_in_background(get_receipts),
+                    run_in_background(
+                        self.store.get_recent_events_for_room,
+                        room_id,
+                        limit=limit,
+                        end_token=now_token.room_key,
+                    )
+                ],
+                consumeErrors=True,
+            ).addErrback(unwrapFirstError),
+        )
 
         messages = yield filter_events_for_client(
             self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1cb81b6cf8..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,269 +14,43 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import simplejson
 import sys
 
-from canonicaljson import encode_canonical_json
 import six
-from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from six import iteritems, itervalues, string_types
+
+from canonicaljson import encode_canonical_json, json
+
+from twisted.internet import defer
 from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
 
-from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
-from synapse.api.errors import (
-    AuthError, Codes, SynapseError,
-    ConsentNotGivenError,
-)
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken,
-)
-from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
+from synapse.replication.http.send_event import send_event_to_master
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
+from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
-from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
-from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeStatus(object):
-    """Object tracking the status of a purge request
-
-    This class contains information on the progress of a purge request, for
-    return by get_purge_status.
-
-    Attributes:
-        status (int): Tracks whether this request has completed. One of
-            STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+    """Contains some read only APIs to get state about a room
     """
 
-    STATUS_ACTIVE = 0
-    STATUS_COMPLETE = 1
-    STATUS_FAILED = 2
-
-    STATUS_TEXT = {
-        STATUS_ACTIVE: "active",
-        STATUS_COMPLETE: "complete",
-        STATUS_FAILED: "failed",
-    }
-
-    def __init__(self):
-        self.status = PurgeStatus.STATUS_ACTIVE
-
-    def asdict(self):
-        return {
-            "status": PurgeStatus.STATUS_TEXT[self.status]
-        }
-
-
-class MessageHandler(BaseHandler):
-
     def __init__(self, hs):
-        super(MessageHandler, self).__init__(hs)
-        self.hs = hs
-        self.state = hs.get_state_handler()
+        self.auth = hs.get_auth()
         self.clock = hs.get_clock()
-
-        self.pagination_lock = ReadWriteLock()
-        self._purges_in_progress_by_room = set()
-        # map from purge id to PurgeStatus
-        self._purges_by_id = {}
-
-    def start_purge_history(self, room_id, token,
-                            delete_local_events=False):
-        """Start off a history purge on a room.
-
-        Args:
-            room_id (str): The room to purge from
-
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            str: unique ID for this purge transaction.
-        """
-        if room_id in self._purges_in_progress_by_room:
-            raise SynapseError(
-                400,
-                "History purge already in progress for %s" % (room_id, ),
-            )
-
-        purge_id = random_string(16)
-
-        # we log the purge_id here so that it can be tied back to the
-        # request id in the log lines.
-        logger.info("[purge] starting purge_id %s", purge_id)
-
-        self._purges_by_id[purge_id] = PurgeStatus()
-        run_in_background(
-            self._purge_history,
-            purge_id, room_id, token, delete_local_events,
-        )
-        return purge_id
-
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token,
-                       delete_local_events):
-        """Carry out a history purge on a room.
-
-        Args:
-            purge_id (str): The id for this purge
-            room_id (str): The room to purge from
-            token (str): topological token to delete events before
-            delete_local_events (bool): True to delete local events as well as
-                remote ones
-
-        Returns:
-            Deferred
-        """
-        self._purges_in_progress_by_room.add(room_id)
-        try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.store.purge_history(
-                    room_id, token, delete_local_events,
-                )
-            logger.info("[purge] complete")
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
-        except Exception:
-            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
-            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
-        finally:
-            self._purges_in_progress_by_room.discard(room_id)
-
-            # remove the purge from the list 24 hours after it completes
-            def clear_purge():
-                del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
-
-    def get_purge_status(self, purge_id):
-        """Get the current status of an active purge
-
-        Args:
-            purge_id (str): purge_id returned by start_purge_history
-
-        Returns:
-            PurgeStatus|None
-        """
-        return self._purges_by_id.get(purge_id)
-
-    @defer.inlineCallbacks
-    def get_messages(self, requester, room_id=None, pagin_config=None,
-                     as_client_event=True, event_filter=None):
-        """Get messages in a room.
-
-        Args:
-            requester (Requester): The user requesting messages.
-            room_id (str): The room they want messages from.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-                config rules to apply, if any.
-            as_client_event (bool): True to get events in client-server format.
-            event_filter (Filter): Filter to apply to results or None
-        Returns:
-            dict: Pagination API results
-        """
-        user_id = requester.user.to_string()
-
-        if pagin_config.from_token:
-            room_token = pagin_config.from_token.room_key
-        else:
-            pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token_for_room(
-                    room_id=room_id
-                )
-            )
-            room_token = pagin_config.from_token.room_key
-
-        room_token = RoomStreamToken.parse(room_token)
-
-        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
-            "room_key", str(room_token)
-        )
-
-        source_config = pagin_config.get_source_config("room")
-
-        with (yield self.pagination_lock.read(room_id)):
-            membership, member_event_id = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-
-            if source_config.direction == 'b':
-                # if we're going backwards, we might need to backfill. This
-                # requires that we have a topo token.
-                if room_token.topological:
-                    max_topo = room_token.topological
-                else:
-                    max_topo = yield self.store.get_max_topological_token(
-                        room_id, room_token.stream
-                    )
-
-                if membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-                    leave_token = yield self.store.get_topological_token_for_event(
-                        member_event_id
-                    )
-                    leave_token = RoomStreamToken.parse(leave_token)
-                    if leave_token.topological < max_topo:
-                        source_config.from_key = str(leave_token)
-
-                yield self.hs.get_handlers().federation_handler.maybe_backfill(
-                    room_id, max_topo
-                )
-
-            events, next_key = yield self.store.paginate_room_events(
-                room_id=room_id,
-                from_key=source_config.from_key,
-                to_key=source_config.to_key,
-                direction=source_config.direction,
-                limit=source_config.limit,
-                event_filter=event_filter,
-            )
-
-            next_token = pagin_config.from_token.copy_and_replace(
-                "room_key", next_key
-            )
-
-        if not events:
-            defer.returnValue({
-                "chunk": [],
-                "start": pagin_config.from_token.to_string(),
-                "end": next_token.to_string(),
-            })
-
-        if event_filter:
-            events = event_filter.filter(events)
-
-        events = yield filter_events_for_client(
-            self.store,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
-
-        time_now = self.clock.time_msec()
-
-        chunk = {
-            "chunk": [
-                serialize_event(e, time_now, as_client_event)
-                for e in events
-            ],
-            "start": pagin_config.from_token.to_string(),
-            "end": next_token.to_string(),
-        }
-
-        defer.returnValue(chunk)
+        self.state = hs.get_state_handler()
+        self.store = hs.get_datastore()
 
     @defer.inlineCallbacks
     def get_room_data(self, user_id=None, room_id=None,
@@ -290,12 +64,12 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
+            data = yield self.state.get_current_state(
                 room_id, event_type, state_key
             )
         elif membership == Membership.LEAVE:
@@ -308,31 +82,6 @@ class MessageHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
     def get_state_events(self, user_id, room_id, is_guest=False):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -344,12 +93,12 @@ class MessageHandler(BaseHandler):
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
             room_id, user_id
         )
 
         if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
+            room_state = yield self.state.get_current_state(room_id)
         elif membership == Membership.LEAVE:
             room_state = yield self.store.get_state_for_events(
                 [membership_event_id], None
@@ -377,7 +126,7 @@ class MessageHandler(BaseHandler):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
+            membership, _ = yield self.auth.check_in_room_or_world_readable(
                 room_id, user_id
             )
             if membership != Membership.JOIN:
@@ -388,7 +137,7 @@ class MessageHandler(BaseHandler):
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
-        # This can either be because the AS user is in the room or becuase there
+        # This can either be because the AS user is in the room or because there
         # is a user in the room that the AS is "interested in"
         if requester.app_service and user_id not in users_with_profile:
             for uid in users_with_profile:
@@ -431,7 +180,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
@@ -491,7 +240,7 @@ class EventCreationHandler(object):
                         target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder)
+        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
         if not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
@@ -509,12 +258,13 @@ class EventCreationHandler(object):
 
         defer.returnValue((event, context))
 
-    def _is_exempt_from_privacy_policy(self, builder):
+    def _is_exempt_from_privacy_policy(self, builder, requester):
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
             builder (synapse.events.builder.EventBuilder): event being created
+            requester (Requster): user requesting this event
 
         Returns:
             Deferred[bool]: true if the event can be sent without the user
@@ -525,6 +275,9 @@ class EventCreationHandler(object):
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
                 return self._is_server_notices_room(builder.room_id)
+            elif membership == Membership.LEAVE:
+                # the user is always allowed to leave (but not kick people)
+                return builder.state_key == requester.user.to_string()
         return succeed(False)
 
     @defer.inlineCallbacks
@@ -630,7 +383,8 @@ class EventCreationHandler(object):
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        prev_event_id = prev_state_ids.get((event.type, event.state_key))
         prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
@@ -752,8 +506,8 @@ class EventCreationHandler(object):
         event = builder.build()
 
         logger.debug(
-            "Created event %s with state: %s",
-            event.event_id, context.prev_state_ids,
+            "Created event %s",
+            event.event_id,
         )
 
         defer.returnValue(
@@ -793,7 +547,7 @@ class EventCreationHandler(object):
         # Ensure that we can round trip before trying to persist in db
         try:
             dump = frozendict_json_encoder.encode(event.content)
-            simplejson.loads(dump)
+            json.loads(dump)
         except Exception:
             logger.exception("Failed to encode content: %r", event.content)
             raise
@@ -806,7 +560,9 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
-                    self.http_client,
+                    clock=self.hs.get_clock(),
+                    store=self.store,
+                    client=self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
                     requester=requester,
@@ -883,9 +639,11 @@ class EventCreationHandler(object):
                         e.sender == event.sender
                     )
 
+                current_state_ids = yield context.get_current_state_ids(self.store)
+
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(context.current_state_ids)
+                    for k, e_id in iteritems(current_state_ids)
                     if k[0] in self.hs.config.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -921,8 +679,9 @@ class EventCreationHandler(object):
                     )
 
         if event.type == EventTypes.Redaction:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
             auth_events_ids = yield self.auth.compute_auth_events(
-                event, context.prev_state_ids, for_verification=True,
+                event, prev_state_ids, for_verification=True,
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {
@@ -942,11 +701,13 @@ class EventCreationHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        if event.type == EventTypes.Create and context.prev_state_ids:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
+        if event.type == EventTypes.Create:
+            prev_state_ids = yield context.get_prev_state_ids(self.store)
+            if prev_state_ids:
+                raise AuthError(
+                    403,
+                    "Changing the room create event is forbidden",
+                )
 
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
@@ -959,9 +720,7 @@ class EventCreationHandler(object):
             event_stream_id, max_stream_id
         )
 
-        @defer.inlineCallbacks
         def _notify():
-            yield run_on_reactor()
             try:
                 self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
+class PaginationHandler(object):
+    """Handles pagination and purge history requests.
+
+    These are in the same handler due to the fact we need to block clients
+    paginating during a purge.
+    """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+        self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
+
+    def start_purge_history(self, room_id, token,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
+
+        Args:
+            room_id (str): The room to purge from
+
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, token, delete_local_events,
+        )
+        return purge_id
+
+    @defer.inlineCallbacks
+    def _purge_history(self, purge_id, room_id, token,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            token (str): topological token to delete events before
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, token, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
+
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
+
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, requester, room_id=None, pagin_config=None,
+                     as_client_event=True, event_filter=None):
+        """Get messages in a room.
+
+        Args:
+            requester (Requester): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+                config rules to apply, if any.
+            as_client_event (bool): True to get events in client-server format.
+            event_filter (Filter): Filter to apply to results or None
+        Returns:
+            dict: Pagination API results
+        """
+        user_id = requester.user.to_string()
+
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
+            pagin_config.from_token = (
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
+                )
+            )
+            room_token = pagin_config.from_token.room_key
+
+        room_token = RoomStreamToken.parse(room_token)
+
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        with (yield self.pagination_lock.read(room_id)):
+            membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+                room_id, user_id
+            )
+
+            if source_config.direction == 'b':
+                # if we're going backwards, we might need to backfill. This
+                # requires that we have a topo token.
+                if room_token.topological:
+                    max_topo = room_token.topological
+                else:
+                    max_topo = yield self.store.get_max_topological_token(
+                        room_id, room_token.stream
+                    )
+
+                if membership == Membership.LEAVE:
+                    # If they have left the room then clamp the token to be before
+                    # they left the room, to save the effort of loading from the
+                    # database.
+                    leave_token = yield self.store.get_topological_token_for_event(
+                        member_event_id
+                    )
+                    leave_token = RoomStreamToken.parse(leave_token)
+                    if leave_token.topological < max_topo:
+                        source_config.from_key = str(leave_token)
+
+                yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                    room_id, max_topo
+                )
+
+            events, next_key = yield self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=source_config.from_key,
+                to_key=source_config.to_key,
+                direction=source_config.direction,
+                limit=source_config.limit,
+                event_filter=event_filter,
+            )
+
+            next_token = pagin_config.from_token.copy_and_replace(
+                "room_key", next_key
+            )
+
+        if not events:
+            defer.returnValue({
+                "chunk": [],
+                "start": pagin_config.from_token.to_string(),
+                "end": next_token.to_string(),
+            })
+
+        if event_filter:
+            events = event_filter.filter(events)
+
+        events = yield filter_events_for_client(
+            self.store,
+            user_id,
+            events,
+            is_peeking=(member_event_id is None),
+        )
+
+        time_now = self.clock.time_msec()
+
+        chunk = {
+            "chunk": [
+                serialize_event(e, time_now, as_client_event)
+                for e in events
+            ],
+            "start": pagin_config.from_token.to_string(),
+            "end": next_token.to_string(),
+        }
+
+        defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..3732830194 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,27 +22,26 @@ The methods that define policy are:
     - should_notify
 """
 
-from twisted.internet import defer, reactor
+import logging
 from contextlib import contextmanager
 
-from six import itervalues, iteritems
+from six import iteritems, itervalues
+
+from prometheus_client import Counter
+
+from twisted.internet import defer
 
-from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
+from synapse.api.errors import SynapseError
+from synapse.metrics import LaterGauge
 from synapse.storage.presence import UserPresenceState
-
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.async import Linearizer
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import run_in_background
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-from synapse.metrics import LaterGauge
-
-import logging
-
-from prometheus_client import Counter
 
 logger = logging.getLogger(__name__)
 
@@ -179,7 +178,7 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
         self.serial_to_user = {}
         self._next_serial = 1
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 3465a787ab..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,8 +17,10 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, AuthError, CodeMessageException
+from synapse.api.errors import AuthError, CodeMessageException, SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import UserID, get_domain_from_id
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -40,7 +42,7 @@ class ProfileHandler(BaseHandler):
 
         if hs.config.worker_app is None:
             self.clock.looping_call(
-                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
             )
 
     @defer.inlineCallbacks
@@ -253,6 +255,12 @@ class ProfileHandler(BaseHandler):
                     room_id, str(e.message)
                 )
 
+    def _start_update_remote_profile_cache(self):
+        return run_as_background_process(
+            "Update remote profile", self._update_remote_profile_cache,
+        )
+
+    @defer.inlineCallbacks
     def _update_remote_profile_cache(self):
         """Called periodically to check profiles of remote users we haven't
         checked in a while.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 5142ae153d..995460f82a 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -13,13 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseHandler
+import logging
 
 from twisted.internet import defer
 
 from synapse.util.async import Linearizer
 
-import logging
+from ._base import BaseHandler
+
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 2e0672161c..cb905a3903 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -12,17 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.util import logcontext
-
-from ._base import BaseHandler
+import logging
 
 from twisted.internet import defer
 
-from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import get_domain_from_id
+from synapse.util import logcontext
+from synapse.util.logcontext import PreserveLoggingContext
 
-import logging
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7e52adda3c..7caff0cbc8 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,14 +18,19 @@ import logging
 
 from twisted.internet import defer
 
+from synapse import types
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
+    AuthError,
+    Codes,
+    InvalidCaptchaError,
+    RegistrationError,
+    SynapseError,
 )
 from synapse.http.client import CaptchaServerHttpClient
-from synapse import types
-from synapse.types import UserID, create_requester, RoomID, RoomAlias
-from synapse.util.async import run_on_reactor, Linearizer
+from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.util.async import Linearizer
 from synapse.util.threepids import check_3pid_allowed
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -139,7 +144,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
         password_hash = None
         if password:
             password_hash = yield self.auth_handler().hash(password)
@@ -431,8 +435,6 @@ class RegistrationHandler(BaseHandler):
         Raises:
             RegistrationError if there was a problem registering.
         """
-        yield run_on_reactor()
-
         if localpart is None:
             raise SynapseError(400, "Request must include user id")
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2abd63ad05..003b848c00 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,23 +15,20 @@
 # limitations under the License.
 
 """Contains functions for performing events on rooms."""
-from twisted.internet import defer
+import logging
+import math
+import string
+from collections import OrderedDict
 
-from ._base import BaseHandler
+from twisted.internet import defer
 
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
-from synapse.api.constants import (
-    EventTypes, JoinRules, RoomCreationPreset
-)
-from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
 from synapse.visibility import filter_events_for_client
 
-from collections import OrderedDict
-
-import logging
-import math
-import string
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -115,7 +112,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
             if mapping:
-                raise SynapseError(400, "Room alias already taken")
+                raise SynapseError(
+                    400,
+                    "Room alias already taken",
+                    Codes.ROOM_IN_USE
+                )
         else:
             room_alias = None
 
@@ -394,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
     @defer.inlineCallbacks
     def get_event_context(self, user, room_id, event_id, limit):
         """Retrieves events, pagination tokens and state around a given event
@@ -413,8 +418,6 @@ class RoomContextHandler(BaseHandler):
         before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
         users = yield self.store.get_users_in_room(room_id)
         is_peeking = user.to_string() not in users
 
@@ -457,11 +460,15 @@ class RoomContextHandler(BaseHandler):
         )
         results["state"] = list(state[last_event_id].values())
 
-        results["start"] = now_token.copy_and_replace(
+        # We use a dummy token here as we only care about the room portion of
+        # the token, which we replace.
+        token = StreamToken.START
+
+        results["start"] = token.copy_and_replace(
             "room_key", results["start"]
         ).to_string()
 
-        results["end"] = now_token.copy_and_replace(
+        results["end"] = token.copy_and_replace(
             "room_key", results["end"]
         ).to_string()
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fc507cef36..828229f5c3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -13,26 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+from collections import namedtuple
 
 from six import iteritems
 from six.moves import range
 
-from ._base import BaseHandler
+import msgpack
+from unpaddedbase64 import decode_base64, encode_base64
+
+from twisted.internet import defer
 
-from synapse.api.constants import (
-    EventTypes, JoinRules,
-)
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.types import ThirdPartyInstanceID
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.types import ThirdPartyInstanceID
-
-from collections import namedtuple
-from unpaddedbase64 import encode_base64, decode_base64
 
-import logging
-import msgpack
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -40,7 +38,7 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
 
 # This is used to indicate we should only return rooms published to the main list.
-EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
+EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
 
 class RoomListHandler(BaseHandler):
@@ -52,7 +50,7 @@ class RoomListHandler(BaseHandler):
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
-                                   network_tuple=EMTPY_THIRD_PARTY_ID,):
+                                   network_tuple=EMPTY_THIRD_PARTY_ID,):
         """Generate a local public room list.
 
         There are multiple different lists: the main one plus one per third
@@ -89,7 +87,7 @@ class RoomListHandler(BaseHandler):
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
-                              network_tuple=EMTPY_THIRD_PARTY_ID,):
+                              network_tuple=EMPTY_THIRD_PARTY_ID,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index f930e939e8..a832d91809 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -21,19 +21,17 @@ from six.moves import http_client
 
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
-from twisted.internet import defer
 from unpaddedbase64 import decode_base64
 
+from twisted.internet import defer
+
 import synapse.server
 import synapse.types
-from synapse.api.constants import (
-    EventTypes, Membership,
-)
-from synapse.api.errors import AuthError, SynapseError, Codes
-from synapse.types import UserID, RoomID
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.types import RoomID, UserID
 from synapse.util.async import Linearizer
-from synapse.util.distributor import user_left_room, user_joined_room
-
+from synapse.util.distributor import user_joined_room, user_left_room
 
 logger = logging.getLogger(__name__)
 
@@ -203,7 +201,9 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, target.to_string()),
             None
         )
@@ -498,9 +498,10 @@ class RoomMemberHandler(object):
         if prev_event is not None:
             return
 
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
         if event.membership == Membership.JOIN:
             if requester.is_guest:
-                guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+                guest_can_join = yield self._can_guest_join(prev_state_ids)
                 if not guest_can_join:
                     # This should be an auth check, but guests are a local concept,
                     # so don't really fit into the general auth process.
@@ -519,7 +520,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = context.prev_state_ids.get(
+        prev_member_event_id = prev_state_ids.get(
             (EventTypes.Member, event.state_key),
             None
         )
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 493aec1e48..22d8b4b0d3 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,11 +20,12 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError
 from synapse.handlers.room_member import RoomMemberHandler
 from synapse.replication.http.membership import (
-    remote_join, remote_reject_invite, get_or_register_3pid_guest,
+    get_or_register_3pid_guest,
     notify_user_membership_change,
+    remote_join,
+    remote_reject_invite,
 )
 
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 1eca26aa1e..69ae9731d5 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -13,21 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import itertools
+import logging
 
-from ._base import BaseHandler
+from unpaddedbase64 import decode_base64, encode_base64
 
-from synapse.api.constants import Membership, EventTypes
-from synapse.api.filtering import Filter
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
+from synapse.api.filtering import Filter
 from synapse.events.utils import serialize_event
 from synapse.visibility import filter_events_for_client
 
-from unpaddedbase64 import decode_base64, encode_base64
-
-import itertools
-import logging
-
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -64,6 +63,13 @@ class SearchHandler(BaseHandler):
             except Exception:
                 raise SynapseError(400, "Invalid batch")
 
+        logger.info(
+            "Search batch properties: %r, %r, %r",
+            batch_group, batch_group_key, batch_token,
+        )
+
+        logger.info("Search content: %s", content)
+
         try:
             room_cat = content["search_categories"]["room_events"]
 
@@ -271,6 +277,8 @@ class SearchHandler(BaseHandler):
             # We should never get here due to the guard earlier.
             raise NotImplementedError()
 
+        logger.info("Found %d events to return", len(allowed_events))
+
         # If client has asked for "context" for each event (i.e. some surrounding
         # events and state), fetch that
         if event_context is not None:
@@ -282,6 +290,11 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
+                logger.info(
+                    "Context for search returned %d and %d events",
+                    len(res["events_before"]), len(res["events_after"]),
+                )
+
                 res["events_before"] = yield filter_events_for_client(
                     self.store, user.to_string(), res["events_before"]
                 )
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index e057ae54c9..7ecdede4dc 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -17,6 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.errors import Codes, StoreError, SynapseError
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 51ec727df0..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,24 +14,34 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.constants import Membership, EventTypes
+import collections
+import itertools
+import logging
+
+from six import iteritems, itervalues
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.push.clientformat import format_push_rules_for_user
+from synapse.types import RoomStreamToken
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
-from synapse.util.caches.response_cache import ResponseCache
-from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
-from synapse.types import RoomStreamToken
-
-from twisted.internet import defer
 
-import collections
-import logging
-import itertools
+logger = logging.getLogger(__name__)
 
-from six import itervalues, iteritems
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
 
-logger = logging.getLogger(__name__)
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 
 
 SyncConfig = collections.namedtuple("SyncConfig", [
@@ -145,7 +156,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
     "invited",  # InvitedSyncResult for each invited room.
     "archived",  # ArchivedSyncResult for each archived room.
     "to_device",  # List of direct messages for the device.
-    "device_lists",  # List of user_ids whose devices have chanegd
+    "device_lists",  # List of user_ids whose devices have changed
     "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
                                    # for this device
     "groups",
@@ -181,6 +192,12 @@ class SyncHandler(object):
         self.response_cache = ResponseCache(hs, "sync")
         self.state = hs.get_state_handler()
 
+        # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+        self.lazy_loaded_members_cache = ExpiringCache(
+            "lazy_loaded_members_cache", self.clock,
+            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+        )
+
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -416,29 +433,44 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def get_state_after_event(self, event):
+    def get_state_after_event(self, event, types=None, filtered_types=None):
         """
         Get the room state after the given event
 
         Args:
             event(synapse.events.EventBase): event of interest
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+                May be None, which matches any key.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
         """
-        state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+        state_ids = yield self.store.get_state_ids_for_event(
+            event.event_id, types, filtered_types=filtered_types,
+        )
         if event.is_state():
             state_ids = state_ids.copy()
             state_ids[(event.type, event.state_key)] = event.event_id
         defer.returnValue(state_ids)
 
     @defer.inlineCallbacks
-    def get_state_at(self, room_id, stream_position):
+    def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
         """ Get the room state at a particular stream position
 
         Args:
             room_id(str): room for which to get state
             stream_position(StreamToken): point at which to get state
+            types(list[(str, str|None)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. If `state_key` is None,
+                all events are returned of the given type.
+            filtered_types(list[str]|None): Only apply filtering via `types` to this
+                list of event types.  Other types of events are returned unfiltered.
+                If None, `types` filtering is applied to all events.
 
         Returns:
             A Deferred map from ((type, state_key)->Event)
@@ -453,7 +485,9 @@ class SyncHandler(object):
 
         if last_events:
             last_event = last_events[-1]
-            state = yield self.get_state_after_event(last_event)
+            state = yield self.get_state_after_event(
+                last_event, types, filtered_types=filtered_types,
+            )
 
         else:
             # no events in this room - so presumably no state
@@ -485,59 +519,129 @@ class SyncHandler(object):
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+
+            types = None
+            filtered_types = None
+
+            lazy_load_members = sync_config.filter_collection.lazy_load_members()
+            include_redundant_members = (
+                sync_config.filter_collection.include_redundant_members()
+            )
+
+            if lazy_load_members:
+                # We only request state for the members needed to display the
+                # timeline:
+
+                types = [
+                    (EventTypes.Member, state_key)
+                    for state_key in set(
+                        event.sender  # FIXME: we also care about invite targets etc.
+                        for event in batch.events
+                    )
+                ]
+
+                # only apply the filtering to room members
+                filtered_types = [EventTypes.Member]
+
+            timeline_state = {
+                (event.type, event.state_key): event.event_id
+                for event in batch.events if event.is_state()
+            }
+
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id
+                        batch.events[-1].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id
+                        batch.events[0].event_id, types=types,
+                        filtered_types=filtered_types,
                     )
+
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token
+                        room_id, stream_position=now_token, types=types,
+                        filtered_types=filtered_types,
                     )
 
                     state_ids = current_state_ids
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_ids,
                     previous={},
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token
+                    room_id, stream_position=since_token, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id
+                    batch.events[-1].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id
+                    batch.events[0].event_id, types=types,
+                    filtered_types=filtered_types,
                 )
 
-                timeline_state = {
-                    (event.type, event.state_key): event.event_id
-                    for event in batch.events if event.is_state()
-                }
-
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
                     previous=state_at_previous_sync,
                     current=current_state_ids,
+                    lazy_load_members=lazy_load_members,
                 )
             else:
                 state_ids = {}
+                if lazy_load_members:
+                    if types:
+                        state_ids = yield self.store.get_state_ids_for_event(
+                            batch.events[0].event_id, types=types,
+                            filtered_types=filtered_types,
+                        )
+
+            if lazy_load_members and not include_redundant_members:
+                cache_key = (sync_config.user.to_string(), sync_config.device_id)
+                cache = self.lazy_loaded_members_cache.get(cache_key)
+                if cache is None:
+                    logger.debug("creating LruCache for %r", cache_key)
+                    cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+                    self.lazy_loaded_members_cache[cache_key] = cache
+                else:
+                    logger.debug("found LruCache for %r", cache_key)
+
+                # if it's a new sync sequence, then assume the client has had
+                # amnesia and doesn't want any recent lazy-loaded members
+                # de-duplicated.
+                if since_token is None:
+                    logger.debug("clearing LruCache for %r", cache_key)
+                    cache.clear()
+                else:
+                    # only send members which aren't in our LruCache (either
+                    # because they're new to this client or have been pushed out
+                    # of the cache)
+                    logger.debug("filtering state from %r...", state_ids)
+                    state_ids = {
+                        t: event_id
+                        for t, event_id in state_ids.iteritems()
+                        if cache.get(t[1]) != event_id
+                    }
+                    logger.debug("...to %r", state_ids)
+
+                # add any member IDs we are about to send into our LruCache
+                for t, event_id in itertools.chain(
+                    state_ids.items(),
+                    timeline_state.items(),
+                ):
+                    if t[0] == EventTypes.Member:
+                        cache.set(t[1], event_id)
 
         state = {}
         if state_ids:
@@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
     return False
 
 
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+    timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
     """Works out what state to include in a sync response.
 
     Args:
@@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         previous (dict): state at the end of the previous sync (or empty dict
             if this is an initial sync)
         current (dict): state at the end of the timeline
+        lazy_load_members (bool): whether to return members from timeline_start
+            or not.  assumes that timeline_start has already been filtered to
+            include only the members the client needs to know about.
 
     Returns:
         dict
@@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
     }
 
     c_ids = set(e for e in current.values())
-    tc_ids = set(e for e in timeline_contains.values())
-    p_ids = set(e for e in previous.values())
     ts_ids = set(e for e in timeline_start.values())
+    p_ids = set(e for e in previous.values())
+    tc_ids = set(e for e in timeline_contains.values())
+
+    # If we are lazyloading room members, we explicitly add the membership events
+    # for the senders in the timeline into the state block returned by /sync,
+    # as we may not have sent them to the client before.  We find these membership
+    # events by filtering them out of timeline_start, which has already been filtered
+    # to only include membership events for the senders in the timeline.
+    # In practice, we can do this by removing them from the p_ids list,
+    # which is the list of relevant state we know we have already sent to the client.
+    # see https://github.com/matrix-org/synapse/pull/2970
+    #            /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+    if lazy_load_members:
+        p_ids.difference_update(
+            e for t, e in timeline_start.iteritems()
+            if t[0] == EventTypes.Member
+        )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5d9736e88f..2d2d3d5a0d 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -13,17 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from collections import namedtuple
+
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, AuthError
+from synapse.api.errors import AuthError, SynapseError
+from synapse.types import UserID, get_domain_from_id
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID, get_domain_from_id
-
-import logging
-
-from collections import namedtuple
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index a39f0f7343..37dda64587 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -14,15 +14,15 @@
 # limitations under the License.
 
 import logging
+
+from six import iteritems
+
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.storage.roommember import ProfileInfo
-from synapse.util.metrics import Measure
-from synapse.util.async import sleep
 from synapse.types import get_localpart_from_id
-
-from six import iteritems
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -174,7 +174,7 @@ class UserDirectoryHandler(object):
             logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
             yield self._handle_initial_room(room_id)
             num_processed_rooms += 1
-            yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
         logger.info("Processed all rooms.")
 
@@ -188,7 +188,7 @@ class UserDirectoryHandler(object):
                 logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
                 yield self._handle_local_user(user_id)
                 num_processed_users += 1
-                yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
 
             logger.info("Processed all users")
 
@@ -236,7 +236,7 @@ class UserDirectoryHandler(object):
         count = 0
         for user_id in user_ids:
             if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
 
             if not self.is_mine_id(user_id):
                 count += 1
@@ -251,7 +251,7 @@ class UserDirectoryHandler(object):
                     continue
 
                 if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
+                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
                 count += 1
 
                 user_set = (user_id, other_user_id)