summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py2
-rw-r--r--synapse/handlers/_base.py2
-rw-r--r--synapse/handlers/appservice.py7
-rw-r--r--synapse/handlers/auth.py28
-rw-r--r--synapse/handlers/device.py13
-rw-r--r--synapse/handlers/devicemessage.py16
-rw-r--r--synapse/handlers/directory.py11
-rw-r--r--synapse/handlers/e2e_keys.py25
-rw-r--r--synapse/handlers/federation.py92
-rw-r--r--synapse/handlers/groups_local.py7
-rw-r--r--synapse/handlers/message.py514
-rw-r--r--synapse/handlers/presence.py11
-rw-r--r--synapse/handlers/profile.py11
-rw-r--r--synapse/handlers/read_marker.py6
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py36
-rw-r--r--synapse/handlers/room.py24
-rw-r--r--synapse/handlers/room_list.py5
-rw-r--r--synapse/handlers/room_member.py334
-rw-r--r--synapse/handlers/room_member_worker.py102
-rw-r--r--synapse/handlers/set_password.py2
-rw-r--r--synapse/handlers/sync.py101
-rw-r--r--synapse/handlers/typing.py2
23 files changed, 939 insertions, 414 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 53213cdccf..8f8fd82eb0 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,7 +17,6 @@ from .register import RegistrationHandler
 from .room import (
     RoomCreationHandler, RoomContextHandler,
 )
-from .room_member import RoomMemberHandler
 from .message import MessageHandler
 from .federation import FederationHandler
 from .directory import DirectoryHandler
@@ -49,7 +48,6 @@ class Handlers(object):
         self.registration_handler = RegistrationHandler(hs)
         self.message_handler = MessageHandler(hs)
         self.room_creation_handler = RoomCreationHandler(hs)
-        self.room_member_handler = RoomMemberHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index faa5609c0c..e089e66fde 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -158,7 +158,7 @@ class BaseHandler(object):
                 # homeserver.
                 requester = synapse.types.create_requester(
                     target_user, is_guest=True)
-                handler = self.hs.get_handlers().room_member_handler
+                handler = self.hs.get_room_member_handler()
                 yield handler.update_membership(
                     requester,
                     target_user,
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index feca3e4c10..3dd3fa2a27 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,6 +15,7 @@
 
 from twisted.internet import defer
 
+import synapse
 from synapse.api.constants import EventTypes
 from synapse.util.metrics import Measure
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
@@ -23,6 +24,10 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+events_processed_counter = metrics.register_counter("events_processed")
+
 
 def log_failure(failure):
     logger.error(
@@ -103,6 +108,8 @@ class ApplicationServicesHandler(object):
                                 service, event
                             )
 
+                    events_processed_counter.inc_by(len(events))
+
                     yield self.store.set_appservice_last_pos(upper_bound)
             finally:
                 self.is_processing = False
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 573c9db8a1..a5365c4fe4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,7 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, threads
 
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
@@ -25,6 +25,7 @@ from synapse.module_api import ModuleApi
 from synapse.types import UserID
 from synapse.util.async import run_on_reactor
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.logcontext import make_deferred_yieldable
 
 from twisted.web.client import PartialDownloadError
 
@@ -714,7 +715,7 @@ class AuthHandler(BaseHandler):
         if not lookupres:
             defer.returnValue(None)
         (user_id, password_hash) = lookupres
-        result = self.validate_hash(password, password_hash)
+        result = yield self.validate_hash(password, password_hash)
         if not result:
             logger.warn("Failed password login for user %s", user_id)
             defer.returnValue(None)
@@ -842,10 +843,13 @@ class AuthHandler(BaseHandler):
             password (str): Password to hash.
 
         Returns:
-            Hashed password (str).
+            Deferred(str): Hashed password.
         """
-        return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
-                             bcrypt.gensalt(self.bcrypt_rounds))
+        def _do_hash():
+            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
+                                 bcrypt.gensalt(self.bcrypt_rounds))
+
+        return make_deferred_yieldable(threads.deferToThread(_do_hash))
 
     def validate_hash(self, password, stored_hash):
         """Validates that self.hash(password) == stored_hash.
@@ -855,13 +859,19 @@ class AuthHandler(BaseHandler):
             stored_hash (str): Expected hash value.
 
         Returns:
-            Whether self.hash(password) == stored_hash (bool).
+            Deferred(bool): Whether self.hash(password) == stored_hash.
         """
+
+        def _do_validate_hash():
+            return bcrypt.checkpw(
+                password.encode('utf8') + self.hs.config.password_pepper,
+                stored_hash.encode('utf8')
+            )
+
         if stored_hash:
-            return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper,
-                                 stored_hash.encode('utf8')) == stored_hash
+            return make_deferred_yieldable(threads.deferToThread(_do_validate_hash))
         else:
-            return False
+            return defer.succeed(False)
 
 
 class MacaroonGeneartor(object):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2152efc692..40f3d24678 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 from synapse.api import errors
 from synapse.api.constants import EventTypes
+from synapse.api.errors import FederationDeniedError
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -36,14 +37,15 @@ class DeviceHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
         self.federation_sender = hs.get_federation_sender()
-        self.federation = hs.get_replication_layer()
 
         self._edu_updater = DeviceListEduUpdater(hs, self)
 
-        self.federation.register_edu_handler(
+        federation_registry = hs.get_federation_registry()
+
+        federation_registry.register_edu_handler(
             "m.device_list_update", self._edu_updater.incoming_device_list_update,
         )
-        self.federation.register_query_handler(
+        federation_registry.register_query_handler(
             "user_devices", self.on_federation_query_user_devices,
         )
 
@@ -429,7 +431,7 @@ class DeviceListEduUpdater(object):
 
     def __init__(self, hs, device_handler):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_client()
         self.clock = hs.get_clock()
         self.device_handler = device_handler
 
@@ -513,6 +515,9 @@ class DeviceListEduUpdater(object):
                     # This makes it more likely that the device lists will
                     # eventually become consistent.
                     return
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    return
                 except Exception:
                     # TODO: Remember that we are now out of sync and try again
                     # later
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f7fad15c62..f147a20b73 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -17,7 +17,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
+from synapse.api.errors import SynapseError
+from synapse.types import get_domain_from_id, UserID
 from synapse.util.stringutils import random_string
 
 
@@ -33,10 +34,10 @@ class DeviceMessageHandler(object):
         """
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
-        self.is_mine_id = hs.is_mine_id
+        self.is_mine = hs.is_mine
         self.federation = hs.get_federation_sender()
 
-        hs.get_replication_layer().register_edu_handler(
+        hs.get_federation_registry().register_edu_handler(
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
@@ -52,6 +53,12 @@ class DeviceMessageHandler(object):
         message_type = content["type"]
         message_id = content["message_id"]
         for user_id, by_device in content["messages"].items():
+            # we use UserID.from_string to catch invalid user ids
+            if not self.is_mine(UserID.from_string(user_id)):
+                logger.warning("Request for keys for non-local user %s",
+                               user_id)
+                raise SynapseError(400, "Not a user here")
+
             messages_by_device = {
                 device_id: {
                     "content": message_content,
@@ -77,7 +84,8 @@ class DeviceMessageHandler(object):
         local_messages = {}
         remote_messages = {}
         for user_id, by_device in messages.items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 messages_by_device = {
                     device_id: {
                         "content": message_content,
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index a0464ae5c0..c5b6e75e03 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -34,9 +34,10 @@ class DirectoryHandler(BaseHandler):
 
         self.state = hs.get_state_handler()
         self.appservice_handler = hs.get_application_service_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
 
-        self.federation = hs.get_replication_layer()
-        self.federation.register_query_handler(
+        self.federation = hs.get_federation_client()
+        hs.get_federation_registry().register_query_handler(
             "directory", self.on_directory_query
         )
 
@@ -249,8 +250,7 @@ class DirectoryHandler(BaseHandler):
     def send_room_alias_update_event(self, requester, user_id, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
-        msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_nonmember_event(
+        yield self.event_creation_handler.create_and_send_nonmember_event(
             requester,
             {
                 "type": EventTypes.Aliases,
@@ -272,8 +272,7 @@ class DirectoryHandler(BaseHandler):
         if not alias_event or alias_event.content.get("alias", "") != alias_str:
             return
 
-        msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_nonmember_event(
+        yield self.event_creation_handler.create_and_send_nonmember_event(
             requester,
             {
                 "type": EventTypes.CanonicalAlias,
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index ce2c87e400..80b359b2e7 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,8 +19,10 @@ import logging
 from canonicaljson import encode_canonical_json
 from twisted.internet import defer
 
-from synapse.api.errors import SynapseError, CodeMessageException
-from synapse.types import get_domain_from_id
+from synapse.api.errors import (
+    SynapseError, CodeMessageException, FederationDeniedError,
+)
+from synapse.types import get_domain_from_id, UserID
 from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -30,15 +32,15 @@ logger = logging.getLogger(__name__)
 class E2eKeysHandler(object):
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.federation = hs.get_replication_layer()
+        self.federation = hs.get_federation_client()
         self.device_handler = hs.get_device_handler()
-        self.is_mine_id = hs.is_mine_id
+        self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
 
         # doesn't really work as part of the generic query API, because the
         # query request requires an object POST, but we abuse the
         # "query handler" interface.
-        self.federation.register_query_handler(
+        hs.get_federation_registry().register_query_handler(
             "client_keys", self.on_federation_query_client_keys
         )
 
@@ -70,7 +72,8 @@ class E2eKeysHandler(object):
         remote_queries = {}
 
         for user_id, device_ids in device_keys_query.items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 local_query[user_id] = device_ids
             else:
                 remote_queries[user_id] = device_ids
@@ -139,6 +142,10 @@ class E2eKeysHandler(object):
                 failures[destination] = {
                     "status": 503, "message": "Not ready for retry",
                 }
+            except FederationDeniedError as e:
+                failures[destination] = {
+                    "status": 403, "message": "Federation Denied",
+                }
             except Exception as e:
                 # include ConnectionRefused and other errors
                 failures[destination] = {
@@ -170,7 +177,8 @@ class E2eKeysHandler(object):
 
         result_dict = {}
         for user_id, device_ids in query.items():
-            if not self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s",
                                user_id)
                 raise SynapseError(400, "Not a user here")
@@ -213,7 +221,8 @@ class E2eKeysHandler(object):
         remote_queries = {}
 
         for user_id, device_keys in query.get("one_time_keys", {}).items():
-            if self.is_mine_id(user_id):
+            # we use UserID.from_string to catch invalid user ids
+            if self.is_mine(UserID.from_string(user_id)):
                 for device_id, algorithm in device_keys.items():
                     local_query.append((user_id, device_id, algorithm))
             else:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -22,6 +23,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import (
     AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+    FederationDeniedError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
@@ -66,7 +68,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
-        self.replication_layer = hs.get_replication_layer()
+        self.replication_layer = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
@@ -74,8 +76,7 @@ class FederationHandler(BaseHandler):
         self.is_mine_id = hs.is_mine_id
         self.pusher_pool = hs.get_pusherpool()
         self.spam_checker = hs.get_spam_checker()
-
-        self.replication_layer.set_handler(self)
+        self.event_creation_handler = hs.get_event_creation_handler()
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
                 except NotRetryingDestination as e:
                     logger.info(e.message)
                     continue
+                except FederationDeniedError as e:
+                    logger.info(e)
+                    continue
                 except Exception as e:
                     logger.exception(
                         "Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
         event_ids = list(extremities.keys())
 
         logger.debug("calling resolve_state_groups in _maybe_backfill")
+        resolve = logcontext.preserve_fn(
+            self.state_handler.resolve_state_groups_for_events
+        )
         states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
-                    room_id, [e]
-                )
-                for e in event_ids
-            ], consumeErrors=True,
+            [resolve(room_id, [e]) for e in event_ids],
+            consumeErrors=True,
         ))
         states = dict(zip(event_ids, [s.state for s in states]))
 
@@ -1004,8 +1007,7 @@ class FederationHandler(BaseHandler):
         })
 
         try:
-            message_handler = self.hs.get_handlers().message_handler
-            event, context = yield message_handler._create_new_client_event(
+            event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder,
             )
         except AuthError as e:
@@ -1245,8 +1247,7 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(
+        event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
@@ -1444,16 +1445,24 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
-        if not event.internal_metadata.is_outlier() and not backfilled:
-            yield self.action_generator.handle_push_actions_for_event(
-                event, context
-            )
+        try:
+            if not event.internal_metadata.is_outlier() and not backfilled:
+                yield self.action_generator.handle_push_actions_for_event(
+                    event, context
+                )
 
-        event_stream_id, max_stream_id = yield self.store.persist_event(
-            event,
-            context=context,
-            backfilled=backfilled,
-        )
+            event_stream_id, max_stream_id = yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=backfilled,
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area
+            logcontext.preserve_fn(
+                self.store.remove_push_actions_from_staging
+            )(event.event_id)
+            raise
 
         if not backfilled:
             # this intentionally does not yield: we don't care about the result
@@ -1828,8 +1837,8 @@ class FederationHandler(BaseHandler):
                 current_state = set(e.event_id for e in auth_events.values())
                 different_auth = event_auth_events - current_state
 
-                self._update_context_for_auth_events(
-                    context, auth_events, event_key,
+                yield self._update_context_for_auth_events(
+                    event, context, auth_events, event_key,
                 )
 
         if different_auth and not event.internal_metadata.is_outlier():
@@ -1910,8 +1919,8 @@ class FederationHandler(BaseHandler):
                 # 4. Look at rejects and their proofs.
                 # TODO.
 
-                self._update_context_for_auth_events(
-                    context, auth_events, event_key,
+                yield self._update_context_for_auth_events(
+                    event, context, auth_events, event_key,
                 )
 
         try:
@@ -1920,11 +1929,15 @@ class FederationHandler(BaseHandler):
             logger.warn("Failed auth resolution for %r because %s", event, e)
             raise e
 
-    def _update_context_for_auth_events(self, context, auth_events,
+    @defer.inlineCallbacks
+    def _update_context_for_auth_events(self, event, context, auth_events,
                                         event_key):
-        """Update the state_ids in an event context after auth event resolution
+        """Update the state_ids in an event context after auth event resolution,
+        storing the changes as a new state group.
 
         Args:
+            event (Event): The event we're handling the context for
+
             context (synapse.events.snapshot.EventContext): event context
                 to be updated
 
@@ -1947,7 +1960,13 @@ class FederationHandler(BaseHandler):
         context.prev_state_ids.update({
             k: a.event_id for k, a in auth_events.iteritems()
         })
-        context.state_group = self.store.get_next_state_group()
+        context.state_group = yield self.store.store_state_group(
+            event.event_id,
+            event.room_id,
+            prev_group=context.prev_group,
+            delta_ids=context.delta_ids,
+            current_state_ids=context.current_state_ids,
+        )
 
     @defer.inlineCallbacks
     def construct_auth_difference(self, local_auth, remote_auth):
@@ -2117,8 +2136,7 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            message_handler = self.hs.get_handlers().message_handler
-            event, context = yield message_handler._create_new_client_event(
+            event, context = yield self.event_creation_handler.create_new_client_event(
                 builder=builder
             )
 
@@ -2133,7 +2151,7 @@ class FederationHandler(BaseHandler):
                 raise e
 
             yield self._check_signature(event, context)
-            member_handler = self.hs.get_handlers().room_member_handler
+            member_handler = self.hs.get_room_member_handler()
             yield member_handler.send_membership_event(None, event, context)
         else:
             destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2156,8 +2174,7 @@ class FederationHandler(BaseHandler):
         """
         builder = self.event_builder_factory.new(event_dict)
 
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(
+        event, context = yield self.event_creation_handler.create_new_client_event(
             builder=builder,
         )
 
@@ -2178,7 +2195,7 @@ class FederationHandler(BaseHandler):
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
 
-        member_handler = self.hs.get_handlers().room_member_handler
+        member_handler = self.hs.get_room_member_handler()
         yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
@@ -2207,8 +2224,9 @@ class FederationHandler(BaseHandler):
 
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        message_handler = self.hs.get_handlers().message_handler
-        event, context = yield message_handler._create_new_client_event(builder=builder)
+        event, context = yield self.event_creation_handler.create_new_client_event(
+            builder=builder,
+        )
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 7e5d3f148d..e4d0cc8b02 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -383,11 +383,12 @@ class GroupsLocalHandler(object):
 
             defer.returnValue({"groups": result})
         else:
-            result = yield self.transport_client.get_publicised_groups_for_user(
-                get_domain_from_id(user_id), user_id
+            bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+                get_domain_from_id(user_id), [user_id],
             )
+            result = bulk_result.get("users", {}).get(user_id)
             # TODO: Verify attestations
-            defer.returnValue(result)
+            defer.returnValue({"groups": result})
 
     @defer.inlineCallbacks
     def bulk_get_publicised_groups(self, user_ids, proxy=True):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d7413833ed..5a8ddc253e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd
+# Copyright 2017 - 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,7 +13,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,10 +25,12 @@ from synapse.types import (
     UserID, RoomAlias, RoomStreamToken,
 )
 from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
 from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
@@ -40,6 +43,36 @@ import simplejson
 logger = logging.getLogger(__name__)
 
 
+class PurgeStatus(object):
+    """Object tracking the status of a purge request
+
+    This class contains information on the progress of a purge request, for
+    return by get_purge_status.
+
+    Attributes:
+        status (int): Tracks whether this request has completed. One of
+            STATUS_{ACTIVE,COMPLETE,FAILED}
+    """
+
+    STATUS_ACTIVE = 0
+    STATUS_COMPLETE = 1
+    STATUS_FAILED = 2
+
+    STATUS_TEXT = {
+        STATUS_ACTIVE: "active",
+        STATUS_COMPLETE: "complete",
+        STATUS_FAILED: "failed",
+    }
+
+    def __init__(self):
+        self.status = PurgeStatus.STATUS_ACTIVE
+
+    def asdict(self):
+        return {
+            "status": PurgeStatus.STATUS_TEXT[self.status]
+        }
+
+
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -47,32 +80,89 @@ class MessageHandler(BaseHandler):
         self.hs = hs
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
-        self.validator = EventValidator()
-        self.profile_handler = hs.get_profile_handler()
 
         self.pagination_lock = ReadWriteLock()
+        self._purges_in_progress_by_room = set()
+        # map from purge id to PurgeStatus
+        self._purges_by_id = {}
 
-        self.pusher_pool = hs.get_pusherpool()
+    def start_purge_history(self, room_id, topological_ordering,
+                            delete_local_events=False):
+        """Start off a history purge on a room.
 
-        # We arbitrarily limit concurrent event creation for a room to 5.
-        # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        Args:
+            room_id (str): The room to purge from
 
-        self.action_generator = hs.get_action_generator()
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
 
-        self.spam_checker = hs.get_spam_checker()
+        Returns:
+            str: unique ID for this purge transaction.
+        """
+        if room_id in self._purges_in_progress_by_room:
+            raise SynapseError(
+                400,
+                "History purge already in progress for %s" % (room_id, ),
+            )
+
+        purge_id = random_string(16)
+
+        # we log the purge_id here so that it can be tied back to the
+        # request id in the log lines.
+        logger.info("[purge] starting purge_id %s", purge_id)
+
+        self._purges_by_id[purge_id] = PurgeStatus()
+        run_in_background(
+            self._purge_history,
+            purge_id, room_id, topological_ordering, delete_local_events,
+        )
+        return purge_id
 
     @defer.inlineCallbacks
-    def purge_history(self, room_id, event_id):
-        event = yield self.store.get_event(event_id)
+    def _purge_history(self, purge_id, room_id, topological_ordering,
+                       delete_local_events):
+        """Carry out a history purge on a room.
+
+        Args:
+            purge_id (str): The id for this purge
+            room_id (str): The room to purge from
+            topological_ordering (int): minimum topo ordering to preserve
+            delete_local_events (bool): True to delete local events as well as
+                remote ones
+
+        Returns:
+            Deferred
+        """
+        self._purges_in_progress_by_room.add(room_id)
+        try:
+            with (yield self.pagination_lock.write(room_id)):
+                yield self.store.purge_history(
+                    room_id, topological_ordering, delete_local_events,
+                )
+            logger.info("[purge] complete")
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+        except Exception:
+            logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+            self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+        finally:
+            self._purges_in_progress_by_room.discard(room_id)
 
-        if event.room_id != room_id:
-            raise SynapseError(400, "Event is for wrong room.")
+            # remove the purge from the list 24 hours after it completes
+            def clear_purge():
+                del self._purges_by_id[purge_id]
+            reactor.callLater(24 * 3600, clear_purge)
 
-        depth = event.depth
+    def get_purge_status(self, purge_id):
+        """Get the current status of an active purge
 
-        with (yield self.pagination_lock.write(room_id)):
-            yield self.store.delete_old_state(room_id, depth)
+        Args:
+            purge_id (str): purge_id returned by start_purge_history
+
+        Returns:
+            PurgeStatus|None
+        """
+        return self._purges_by_id.get(purge_id)
 
     @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
@@ -183,6 +273,165 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
+    def get_room_data(self, user_id=None, room_id=None,
+                      event_type=None, state_key="", is_guest=False):
+        """ Get data from a room.
+
+        Args:
+            event : The room path event
+        Returns:
+            The path data content.
+        Raises:
+            SynapseError if something went wrong.
+        """
+        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+            room_id, user_id
+        )
+
+        if membership == Membership.JOIN:
+            data = yield self.state_handler.get_current_state(
+                room_id, event_type, state_key
+            )
+        elif membership == Membership.LEAVE:
+            key = (event_type, state_key)
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], [key]
+            )
+            data = room_state[membership_event_id].get(key)
+
+        defer.returnValue(data)
+
+    @defer.inlineCallbacks
+    def _check_in_room_or_world_readable(self, room_id, user_id):
+        try:
+            # check_user_was_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+            defer.returnValue((member_event.membership, member_event.event_id))
+            return
+        except AuthError:
+            visibility = yield self.state_handler.get_current_state(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility and
+                visibility.content["history_visibility"] == "world_readable"
+            ):
+                defer.returnValue((Membership.JOIN, None))
+                return
+            raise AuthError(
+                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+            )
+
+    @defer.inlineCallbacks
+    def get_state_events(self, user_id, room_id, is_guest=False):
+        """Retrieve all state events for a given room. If the user is
+        joined to the room then return the current state. If the user has
+        left the room return the state events from when they left.
+
+        Args:
+            user_id(str): The user requesting state events.
+            room_id(str): The room ID to get all state events from.
+        Returns:
+            A list of dicts representing state events. [{}, {}, {}]
+        """
+        membership, membership_event_id = yield self._check_in_room_or_world_readable(
+            room_id, user_id
+        )
+
+        if membership == Membership.JOIN:
+            room_state = yield self.state_handler.get_current_state(room_id)
+        elif membership == Membership.LEAVE:
+            room_state = yield self.store.get_state_for_events(
+                [membership_event_id], None
+            )
+            room_state = room_state[membership_event_id]
+
+        now = self.clock.time_msec()
+        defer.returnValue(
+            [serialize_event(c, now) for c in room_state.values()]
+        )
+
+    @defer.inlineCallbacks
+    def get_joined_members(self, requester, room_id):
+        """Get all the joined members in the room and their profile information.
+
+        If the user has left the room return the state events from when they left.
+
+        Args:
+            requester(Requester): The user requesting state events.
+            room_id(str): The room ID to get all state events from.
+        Returns:
+            A dict of user_id to profile info
+        """
+        user_id = requester.user.to_string()
+        if not requester.app_service:
+            # We check AS auth after fetching the room membership, as it
+            # requires us to pull out all joined members anyway.
+            membership, _ = yield self._check_in_room_or_world_readable(
+                room_id, user_id
+            )
+            if membership != Membership.JOIN:
+                raise NotImplementedError(
+                    "Getting joined members after leaving is not implemented"
+                )
+
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+        # If this is an AS, double check that they are allowed to see the members.
+        # This can either be because the AS user is in the room or becuase there
+        # is a user in the room that the AS is "interested in"
+        if requester.app_service and user_id not in users_with_profile:
+            for uid in users_with_profile:
+                if requester.app_service.is_interested_in_user(uid):
+                    break
+            else:
+                # Loop fell through, AS has no interested users in room
+                raise AuthError(403, "Appservice not in room")
+
+        defer.returnValue({
+            user_id: {
+                "avatar_url": profile.avatar_url,
+                "display_name": profile.display_name,
+            }
+            for user_id, profile in users_with_profile.iteritems()
+        })
+
+
+class EventCreationHandler(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self.clock = hs.get_clock()
+        self.validator = EventValidator()
+        self.profile_handler = hs.get_profile_handler()
+        self.event_builder_factory = hs.get_event_builder_factory()
+        self.server_name = hs.hostname
+        self.ratelimiter = hs.get_ratelimiter()
+        self.notifier = hs.get_notifier()
+        self.config = hs.config
+
+        self.http_client = hs.get_simple_http_client()
+
+        # This is only used to get at ratelimit function, and maybe_kick_guest_users
+        self.base_handler = BaseHandler(hs)
+
+        self.pusher_pool = hs.get_pusherpool()
+
+        # We arbitrarily limit concurrent event creation for a room to 5.
+        # This is to stop us from diverging history *too* much.
+        self.limiter = Limiter(max_count=5)
+
+        self.action_generator = hs.get_action_generator()
+
+        self.spam_checker = hs.get_spam_checker()
+
+    @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
                      prev_event_ids=None):
         """
@@ -234,7 +483,7 @@ class MessageHandler(BaseHandler):
             if txn_id is not None:
                 builder.internal_metadata.txn_id = txn_id
 
-            event, context = yield self._create_new_client_event(
+            event, context = yield self.create_new_client_event(
                 builder=builder,
                 requester=requester,
                 prev_event_ids=prev_event_ids,
@@ -259,11 +508,6 @@ class MessageHandler(BaseHandler):
                 "Tried to send member event through non-member codepath"
             )
 
-        # We check here if we are currently being rate limited, so that we
-        # don't do unnecessary work. We check again just before we actually
-        # send the event.
-        yield self.ratelimit(requester, update=False)
-
         user = UserID.from_string(event.sender)
 
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
@@ -280,12 +524,6 @@ class MessageHandler(BaseHandler):
             ratelimit=ratelimit,
         )
 
-        if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(user)
-
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
         """
@@ -342,137 +580,9 @@ class MessageHandler(BaseHandler):
         )
         defer.returnValue(event)
 
+    @measure_func("create_new_client_event")
     @defer.inlineCallbacks
-    def get_room_data(self, user_id=None, room_id=None,
-                      event_type=None, state_key="", is_guest=False):
-        """ Get data from a room.
-
-        Args:
-            event : The room path event
-        Returns:
-            The path data content.
-        Raises:
-            SynapseError if something went wrong.
-        """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
-
-        if membership == Membership.JOIN:
-            data = yield self.state_handler.get_current_state(
-                room_id, event_type, state_key
-            )
-        elif membership == Membership.LEAVE:
-            key = (event_type, state_key)
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], [key]
-            )
-            data = room_state[membership_event_id].get(key)
-
-        defer.returnValue(data)
-
-    @defer.inlineCallbacks
-    def _check_in_room_or_world_readable(self, room_id, user_id):
-        try:
-            # check_user_was_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            defer.returnValue((member_event.membership, member_event.event_id))
-            return
-        except AuthError:
-            visibility = yield self.state_handler.get_current_state(
-                room_id, EventTypes.RoomHistoryVisibility, ""
-            )
-            if (
-                visibility and
-                visibility.content["history_visibility"] == "world_readable"
-            ):
-                defer.returnValue((Membership.JOIN, None))
-                return
-            raise AuthError(
-                403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
-            )
-
-    @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
-        """Retrieve all state events for a given room. If the user is
-        joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
-
-        Args:
-            user_id(str): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
-        Returns:
-            A list of dicts representing state events. [{}, {}, {}]
-        """
-        membership, membership_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id
-        )
-
-        if membership == Membership.JOIN:
-            room_state = yield self.state_handler.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
-            )
-            room_state = room_state[membership_event_id]
-
-        now = self.clock.time_msec()
-        defer.returnValue(
-            [serialize_event(c, now) for c in room_state.values()]
-        )
-
-    @defer.inlineCallbacks
-    def get_joined_members(self, requester, room_id):
-        """Get all the joined members in the room and their profile information.
-
-        If the user has left the room return the state events from when they left.
-
-        Args:
-            requester(Requester): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
-        Returns:
-            A dict of user_id to profile info
-        """
-        user_id = requester.user.to_string()
-        if not requester.app_service:
-            # We check AS auth after fetching the room membership, as it
-            # requires us to pull out all joined members anyway.
-            membership, _ = yield self._check_in_room_or_world_readable(
-                room_id, user_id
-            )
-            if membership != Membership.JOIN:
-                raise NotImplementedError(
-                    "Getting joined members after leaving is not implemented"
-                )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-
-        # If this is an AS, double check that they are allowed to see the members.
-        # This can either be because the AS user is in the room or becuase there
-        # is a user in the room that the AS is "interested in"
-        if requester.app_service and user_id not in users_with_profile:
-            for uid in users_with_profile:
-                if requester.app_service.is_interested_in_user(uid):
-                    break
-            else:
-                # Loop fell through, AS has no interested users in room
-                raise AuthError(403, "Appservice not in room")
-
-        defer.returnValue({
-            user_id: {
-                "avatar_url": profile.avatar_url,
-                "display_name": profile.display_name,
-            }
-            for user_id, profile in users_with_profile.iteritems()
-        })
-
-    @measure_func("_create_new_client_event")
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
+    def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
         if prev_event_ids:
             prev_events = yield self.store.add_event_hashes(prev_event_ids)
             prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
@@ -509,9 +619,7 @@ class MessageHandler(BaseHandler):
         builder.prev_events = prev_events
         builder.depth = depth
 
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
+        context = yield self.state.compute_event_context(builder)
         if requester:
             context.app_service = requester.app_service
 
@@ -546,12 +654,21 @@ class MessageHandler(BaseHandler):
         event,
         context,
         ratelimit=True,
-        extra_users=[]
+        extra_users=[],
     ):
-        # We now need to go and hit out to wherever we need to hit out to.
+        """Processes a new event. This includes checking auth, persisting it,
+        notifying users, sending to remote servers, etc.
 
-        if ratelimit:
-            yield self.ratelimit(requester)
+        If called from a worker will hit out to the master process for final
+        processing.
+
+        Args:
+            requester (Requester)
+            event (FrozenEvent)
+            context (EventContext)
+            ratelimit (bool)
+            extra_users (list(UserID)): Any extra users to notify about event
+        """
 
         try:
             yield self.auth.check_from_context(event, context)
@@ -567,7 +684,58 @@ class MessageHandler(BaseHandler):
             logger.exception("Failed to encode content: %r", event.content)
             raise
 
-        yield self.maybe_kick_guest_users(event, context)
+        yield self.action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        try:
+            # If we're a worker we need to hit out to the master.
+            if self.config.worker_app:
+                yield send_event_to_master(
+                    self.http_client,
+                    host=self.config.worker_replication_host,
+                    port=self.config.worker_replication_http_port,
+                    requester=requester,
+                    event=event,
+                    context=context,
+                    ratelimit=ratelimit,
+                    extra_users=extra_users,
+                )
+                return
+
+            yield self.persist_and_notify_client_event(
+                requester,
+                event,
+                context,
+                ratelimit=ratelimit,
+                extra_users=extra_users,
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area, if we calculated them.
+            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
+            raise
+
+    @defer.inlineCallbacks
+    def persist_and_notify_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[],
+    ):
+        """Called when we have fully built the event, have already
+        calculated the push actions for the event, and checked auth.
+
+        This should only be run on master.
+        """
+        assert not self.config.worker_app
+
+        if ratelimit:
+            yield self.base_handler.ratelimit(requester)
+
+        yield self.base_handler.maybe_kick_guest_users(event, context)
 
         if event.type == EventTypes.CanonicalAlias:
             # Check the alias is acually valid (at this time at least)
@@ -660,10 +828,6 @@ class MessageHandler(BaseHandler):
                 "Changing the room create event is forbidden",
             )
 
-        yield self.action_generator.handle_push_actions_for_event(
-            event, context
-        )
-
         (event_stream_id, max_stream_id) = yield self.store.persist_event(
             event, context=context
         )
@@ -683,3 +847,9 @@ class MessageHandler(BaseHandler):
             )
 
         preserve_fn(_notify)()
+
+        if event.type == EventTypes.Message:
+            presence = self.hs.get_presence_handler()
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            preserve_fn(presence.bump_presence_active_time)(requester.user)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cb158ba962..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,29 +93,30 @@ class PresenceHandler(object):
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.replication = hs.get_replication_layer()
         self.federation = hs.get_federation_sender()
 
         self.state = hs.get_state_handler()
 
-        self.replication.register_edu_handler(
+        federation_registry = hs.get_federation_registry()
+
+        federation_registry.register_edu_handler(
             "m.presence", self.incoming_presence
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
                 observer_user=UserID.from_string(content["observer_user"]),
             )
         )
-        self.replication.register_edu_handler(
+        federation_registry.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
                 observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 9800e24453..3465a787ab 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,14 +31,17 @@ class ProfileHandler(BaseHandler):
     def __init__(self, hs):
         super(ProfileHandler, self).__init__(hs)
 
-        self.federation = hs.get_replication_layer()
-        self.federation.register_query_handler(
+        self.federation = hs.get_federation_client()
+        hs.get_federation_registry().register_query_handler(
             "profile", self.on_profile_query
         )
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
-        self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
+        if hs.config.worker_app is None:
+            self.clock.looping_call(
+                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+            )
 
     @defer.inlineCallbacks
     def get_profile(self, user_id):
@@ -233,7 +236,7 @@ class ProfileHandler(BaseHandler):
         )
 
         for room_id in room_ids:
-            handler = self.hs.get_handlers().room_member_handler
+            handler = self.hs.get_room_member_handler()
             try:
                 # Assume the target_user isn't a guest,
                 # because we don't let guests set profile or avatar data.
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index b5b0303d54..5142ae153d 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -41,9 +41,9 @@ class ReadMarkerHandler(BaseHandler):
         """
 
         with (yield self.read_marker_linearizer.queue((room_id, user_id))):
-            account_data = yield self.store.get_account_data_for_room(user_id, room_id)
-
-            existing_read_marker = account_data.get("m.fully_read", None)
+            existing_read_marker = yield self.store.get_account_data_for_room_and_type(
+                user_id, room_id, "m.fully_read",
+            )
 
             should_update = True
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 0525765272..3f215c2b4e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler):
         self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_federation_sender()
-        hs.get_replication_layer().register_edu_handler(
+        hs.get_federation_registry().register_edu_handler(
             "m.receipt", self._received_remote_receipt
         )
         self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4bc6ef51fe..ed5939880a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient
 from synapse import types
 from synapse.types import UserID
 from synapse.util.async import run_on_reactor
+from synapse.util.threepids import check_3pid_allowed
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -131,7 +132,7 @@ class RegistrationHandler(BaseHandler):
         yield run_on_reactor()
         password_hash = None
         if password:
-            password_hash = self.auth_handler().hash(password)
+            password_hash = yield self.auth_handler().hash(password)
 
         if localpart:
             yield self.check_username(localpart, guest_access_token=guest_access_token)
@@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler):
         """
 
         for c in threepidCreds:
-            logger.info("validating theeepidcred sid %s on id server %s",
+            logger.info("validating threepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
                 identity_handler = self.hs.get_handlers().identity_handler
@@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler):
             logger.info("got threepid with medium '%s' and address '%s'",
                         threepid['medium'], threepid['address'])
 
+            if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']):
+                raise RegistrationError(
+                    403, "Third party identifier is not allowed"
+                )
+
     @defer.inlineCallbacks
     def bind_emails(self, user_id, threepidCreds):
         """Links emails with a user ID and informs an identity server.
@@ -440,16 +446,34 @@ class RegistrationHandler(BaseHandler):
         return self.hs.get_auth_handler()
 
     @defer.inlineCallbacks
-    def guest_access_token_for(self, medium, address, inviter_user_id):
+    def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
+        """Get a guest access token for a 3PID, creating a guest account if
+        one doesn't already exist.
+
+        Args:
+            medium (str)
+            address (str)
+            inviter_user_id (str): The user ID who is trying to invite the
+                3PID
+
+        Returns:
+            Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+            3PID guest account.
+        """
         access_token = yield self.store.get_3pid_guest_access_token(medium, address)
         if access_token:
-            defer.returnValue(access_token)
+            user_info = yield self.auth.get_user_by_access_token(
+                access_token
+            )
 
-        _, access_token = yield self.register(
+            defer.returnValue((user_info["user"].to_string(), access_token))
+
+        user_id, access_token = yield self.register(
             generate_token=True,
             make_guest=True
         )
         access_token = yield self.store.save_or_get_3pid_guest_access_token(
             medium, address, access_token, inviter_user_id
         )
-        defer.returnValue(access_token)
+
+        defer.returnValue((user_id, access_token))
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d1cc87a016..8df8fcbbad 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,6 +65,7 @@ class RoomCreationHandler(BaseHandler):
         super(RoomCreationHandler, self).__init__(hs)
 
         self.spam_checker = hs.get_spam_checker()
+        self.event_creation_handler = hs.get_event_creation_handler()
 
     @defer.inlineCallbacks
     def create_room(self, requester, config, ratelimit=True):
@@ -163,13 +165,11 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
-        msg_handler = self.hs.get_handlers().message_handler
-        room_member_handler = self.hs.get_handlers().room_member_handler
+        room_member_handler = self.hs.get_room_member_handler()
 
         yield self._send_events_for_new_room(
             requester,
             room_id,
-            msg_handler,
             room_member_handler,
             preset_config=preset_config,
             invite_list=invite_list,
@@ -181,7 +181,7 @@ class RoomCreationHandler(BaseHandler):
 
         if "name" in config:
             name = config["name"]
-            yield msg_handler.create_and_send_nonmember_event(
+            yield self.event_creation_handler.create_and_send_nonmember_event(
                 requester,
                 {
                     "type": EventTypes.Name,
@@ -194,7 +194,7 @@ class RoomCreationHandler(BaseHandler):
 
         if "topic" in config:
             topic = config["topic"]
-            yield msg_handler.create_and_send_nonmember_event(
+            yield self.event_creation_handler.create_and_send_nonmember_event(
                 requester,
                 {
                     "type": EventTypes.Topic,
@@ -224,7 +224,7 @@ class RoomCreationHandler(BaseHandler):
             id_server = invite_3pid["id_server"]
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
-            yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
+            yield self.hs.get_room_member_handler().do_3pid_invite(
                 room_id,
                 requester.user,
                 medium,
@@ -249,7 +249,6 @@ class RoomCreationHandler(BaseHandler):
             self,
             creator,  # A Requester object.
             room_id,
-            msg_handler,
             room_member_handler,
             preset_config,
             invite_list,
@@ -272,7 +271,7 @@ class RoomCreationHandler(BaseHandler):
         @defer.inlineCallbacks
         def send(etype, content, **kwargs):
             event = create(etype, content, **kwargs)
-            yield msg_handler.create_and_send_nonmember_event(
+            yield self.event_creation_handler.create_and_send_nonmember_event(
                 creator,
                 event,
                 ratelimit=False
@@ -476,12 +475,9 @@ class RoomEventSource(object):
             user.to_string()
         )
         if app_service:
-            events, end_key = yield self.store.get_appservice_room_stream(
-                service=app_service,
-                from_key=from_key,
-                to_key=to_key,
-                limit=limit,
-            )
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
         else:
             room_events = yield self.store.get_membership_changes_for_user(
                 user.to_string(), from_key, to_key
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index bb40075387..5d81f59b44 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler):
         if limit:
             step = limit + 1
         else:
-            step = len(rooms_to_scan)
+            # step cannot be zero
+            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
 
         chunk = []
         for i in xrange(0, len(rooms_to_scan), step):
@@ -408,7 +409,7 @@ class RoomListHandler(BaseHandler):
     def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
                                 search_filter=None, include_all_networks=False,
                                 third_party_instance_id=None,):
-        repl_layer = self.hs.get_replication_layer()
+        repl_layer = self.hs.get_federation_client()
         if search_filter:
             # We can't cache when asking for search
             return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 7e6467cd1d..9977be8831 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import abc
 import logging
 
 from signedjson.key import decode_verify_key_bytes
@@ -29,32 +30,121 @@ from synapse.api.errors import AuthError, SynapseError, Codes
 from synapse.types import UserID, RoomID
 from synapse.util.async import Linearizer
 from synapse.util.distributor import user_left_room, user_joined_room
-from ._base import BaseHandler
+
 
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
 
 
-class RoomMemberHandler(BaseHandler):
+class RoomMemberHandler(object):
     # TODO(paul): This handler currently contains a messy conflation of
     #   low-level API that works on UserID objects and so on, and REST-level
     #   API that takes ID strings and returns pagination chunks. These concerns
     #   ought to be separated out a lot better.
 
-    def __init__(self, hs):
-        super(RoomMemberHandler, self).__init__(hs)
+    __metaclass__ = abc.ABCMeta
 
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+        self.state_handler = hs.get_state_handler()
+        self.config = hs.config
+        self.simple_http_client = hs.get_simple_http_client()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.directory_handler = hs.get_handlers().directory_handler
+        self.registration_handler = hs.get_handlers().registration_handler
         self.profile_handler = hs.get_profile_handler()
+        self.event_creation_hander = hs.get_event_creation_handler()
 
         self.member_linearizer = Linearizer(name="member")
 
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
 
-        self.distributor = hs.get_distributor()
-        self.distributor.declare("user_joined_room")
-        self.distributor.declare("user_left_room")
+    @abc.abstractmethod
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Try and join a room that this server is not in
+
+        Args:
+            requester (Requester)
+            remote_room_hosts (list[str]): List of servers that can be used
+                to join via.
+            room_id (str): Room that we are trying to join
+            user (UserID): User who is trying to join
+            content (dict): A dict that should be used as the content of the
+                join event.
+
+        Returns:
+            Deferred
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+        """Attempt to reject an invite for a room this server is not in. If we
+        fail to do so we locally mark the invite as rejected.
+
+        Args:
+            requester (Requester)
+            remote_room_hosts (list[str]): List of servers to use to try and
+                reject invite
+            room_id (str)
+            target (UserID): The user rejecting the invite
+
+        Returns:
+            Deferred[dict]: A dictionary to be returned to the client, may
+            include event_id etc, or nothing if we locally rejected
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Get a guest access token for a 3PID, creating a guest account if
+        one doesn't already exist.
+
+        Args:
+            requester (Requester)
+            medium (str)
+            address (str)
+            inviter_user_id (str): The user ID who is trying to invite the
+                3PID
+
+        Returns:
+            Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+            3PID guest account.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _user_joined_room(self, target, room_id):
+        """Notifies distributor on master process that the user has joined the
+        room.
+
+        Args:
+            target (UserID)
+            room_id (str)
+
+        Returns:
+            Deferred|None
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _user_left_room(self, target, room_id):
+        """Notifies distributor on master process that the user has left the
+        room.
+
+        Args:
+            target (UserID)
+            room_id (str)
+
+        Returns:
+            Deferred|None
+        """
+        raise NotImplementedError()
 
     @defer.inlineCallbacks
     def _local_membership_update(
@@ -66,13 +156,12 @@ class RoomMemberHandler(BaseHandler):
     ):
         if content is None:
             content = {}
-        msg_handler = self.hs.get_handlers().message_handler
 
         content["membership"] = membership
         if requester.is_guest:
             content["kind"] = "guest"
 
-        event, context = yield msg_handler.create_event(
+        event, context = yield self.event_creation_hander.create_event(
             requester,
             {
                 "type": EventTypes.Member,
@@ -90,12 +179,14 @@ class RoomMemberHandler(BaseHandler):
         )
 
         # Check if this event matches the previous membership event for the user.
-        duplicate = yield msg_handler.deduplicate_state_event(event, context)
+        duplicate = yield self.event_creation_hander.deduplicate_state_event(
+            event, context,
+        )
         if duplicate is not None:
             # Discard the new event since this membership change is a no-op.
             defer.returnValue(duplicate)
 
-        yield msg_handler.handle_new_client_event(
+        yield self.event_creation_hander.handle_new_client_event(
             requester,
             event,
             context,
@@ -117,33 +208,16 @@ class RoomMemberHandler(BaseHandler):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield user_joined_room(self.distributor, target, room_id)
+                yield self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 if prev_member_event.membership == Membership.JOIN:
-                    user_left_room(self.distributor, target, room_id)
+                    yield self._user_left_room(target, room_id)
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def remote_join(self, remote_room_hosts, room_id, user, content):
-        if len(remote_room_hosts) == 0:
-            raise SynapseError(404, "No known servers")
-
-        # We don't do an auth check if we are doing an invite
-        # join dance for now, since we're kinda implicitly checking
-        # that we are allowed to join when we decide whether or not we
-        # need to do the invite/join dance.
-        yield self.hs.get_handlers().federation_handler.do_invite_join(
-            remote_room_hosts,
-            room_id,
-            user.to_string(),
-            content,
-        )
-        yield user_joined_room(self.distributor, user, room_id)
-
-    @defer.inlineCallbacks
     def update_membership(
             self,
             requester,
@@ -201,8 +275,7 @@ class RoomMemberHandler(BaseHandler):
         # if this is a join with a 3pid signature, we may need to turn a 3pid
         # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
-            replication = self.hs.get_replication_layer()
-            yield replication.exchange_third_party_invite(
+            yield self.federation_handler.exchange_third_party_invite(
                 third_party_signed["sender"],
                 target.to_string(),
                 room_id,
@@ -223,7 +296,7 @@ class RoomMemberHandler(BaseHandler):
                 requester.user,
             )
             if not is_requester_admin:
-                if self.hs.config.block_non_admin_invites:
+                if self.config.block_non_admin_invites:
                     logger.info(
                         "Blocking invite: user is not admin and non-admin "
                         "invites disabled"
@@ -282,7 +355,7 @@ class RoomMemberHandler(BaseHandler):
                     raise AuthError(403, "Guest access not allowed")
 
             if not is_host_in_room:
-                inviter = yield self.get_inviter(target.to_string(), room_id)
+                inviter = yield self._get_inviter(target.to_string(), room_id)
                 if inviter and not self.hs.is_mine(inviter):
                     remote_room_hosts.append(inviter.domain)
 
@@ -296,15 +369,15 @@ class RoomMemberHandler(BaseHandler):
                 if requester.is_guest:
                     content["kind"] = "guest"
 
-                ret = yield self.remote_join(
-                    remote_room_hosts, room_id, target, content
+                ret = yield self._remote_join(
+                    requester, remote_room_hosts, room_id, target, content
                 )
                 defer.returnValue(ret)
 
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
                 # perhaps we've been invited
-                inviter = yield self.get_inviter(target.to_string(), room_id)
+                inviter = yield self._get_inviter(target.to_string(), room_id)
                 if not inviter:
                     raise SynapseError(404, "Not a known room")
 
@@ -318,28 +391,10 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    fed_handler = self.hs.get_handlers().federation_handler
-                    try:
-                        ret = yield fed_handler.do_remotely_reject_invite(
-                            remote_room_hosts,
-                            room_id,
-                            target.to_string(),
-                        )
-                        defer.returnValue(ret)
-                    except Exception as e:
-                        # if we were unable to reject the exception, just mark
-                        # it as rejected on our end and plough ahead.
-                        #
-                        # The 'except' clause is very broad, but we need to
-                        # capture everything from DNS failures upwards
-                        #
-                        logger.warn("Failed to reject invite: %s", e)
-
-                        yield self.store.locally_reject_invite(
-                            target.to_string(), room_id
-                        )
-
-                        defer.returnValue({})
+                    res = yield self._remote_reject_invite(
+                        requester, remote_room_hosts, room_id, target,
+                    )
+                    defer.returnValue(res)
 
         res = yield self._local_membership_update(
             requester=requester,
@@ -394,8 +449,9 @@ class RoomMemberHandler(BaseHandler):
         else:
             requester = synapse.types.create_requester(target_user)
 
-        message_handler = self.hs.get_handlers().message_handler
-        prev_event = yield message_handler.deduplicate_state_event(event, context)
+        prev_event = yield self.event_creation_hander.deduplicate_state_event(
+            event, context,
+        )
         if prev_event is not None:
             return
 
@@ -412,7 +468,7 @@ class RoomMemberHandler(BaseHandler):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
-        yield message_handler.handle_new_client_event(
+        yield self.event_creation_hander.handle_new_client_event(
             requester,
             event,
             context,
@@ -434,12 +490,12 @@ class RoomMemberHandler(BaseHandler):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield user_joined_room(self.distributor, target_user, room_id)
+                yield self._user_joined_room(target_user, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 if prev_member_event.membership == Membership.JOIN:
-                    user_left_room(self.distributor, target_user, room_id)
+                    yield self._user_left_room(target_user, room_id)
 
     @defer.inlineCallbacks
     def _can_guest_join(self, current_state_ids):
@@ -473,7 +529,7 @@ class RoomMemberHandler(BaseHandler):
         Raises:
             SynapseError if room alias could not be found.
         """
-        directory_handler = self.hs.get_handlers().directory_handler
+        directory_handler = self.directory_handler
         mapping = yield directory_handler.get_association(room_alias)
 
         if not mapping:
@@ -485,7 +541,7 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue((RoomID.from_string(room_id), servers))
 
     @defer.inlineCallbacks
-    def get_inviter(self, user_id, room_id):
+    def _get_inviter(self, user_id, room_id):
         invite = yield self.store.get_invite_for_user_in_room(
             user_id=user_id,
             room_id=room_id,
@@ -504,7 +560,7 @@ class RoomMemberHandler(BaseHandler):
             requester,
             txn_id
     ):
-        if self.hs.config.block_non_admin_invites:
+        if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(
                 requester.user,
             )
@@ -551,7 +607,7 @@ class RoomMemberHandler(BaseHandler):
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
         try:
-            data = yield self.hs.get_simple_http_client().get_json(
+            data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
                 {
                     "medium": medium,
@@ -562,7 +618,7 @@ class RoomMemberHandler(BaseHandler):
             if "mxid" in data:
                 if "signatures" not in data:
                     raise AuthError(401, "No signatures on 3pid binding")
-                self.verify_any_signature(data, id_server)
+                yield self._verify_any_signature(data, id_server)
                 defer.returnValue(data["mxid"])
 
         except IOError as e:
@@ -570,11 +626,11 @@ class RoomMemberHandler(BaseHandler):
             defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def verify_any_signature(self, data, server_hostname):
+    def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.hs.get_simple_http_client().get_json(
+            key_data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/pubkey/%s" %
                 (id_server_scheme, server_hostname, key_name,),
             )
@@ -599,7 +655,7 @@ class RoomMemberHandler(BaseHandler):
             user,
             txn_id
     ):
-        room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+        room_state = yield self.state_handler.get_current_state(room_id)
 
         inviter_display_name = ""
         inviter_avatar_url = ""
@@ -630,6 +686,7 @@ class RoomMemberHandler(BaseHandler):
 
         token, public_keys, fallback_public_key, display_name = (
             yield self._ask_id_server_for_third_party_invite(
+                requester=requester,
                 id_server=id_server,
                 medium=medium,
                 address=address,
@@ -644,8 +701,7 @@ class RoomMemberHandler(BaseHandler):
             )
         )
 
-        msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_nonmember_event(
+        yield self.event_creation_hander.create_and_send_nonmember_event(
             requester,
             {
                 "type": EventTypes.ThirdPartyInvite,
@@ -667,6 +723,7 @@ class RoomMemberHandler(BaseHandler):
     @defer.inlineCallbacks
     def _ask_id_server_for_third_party_invite(
             self,
+            requester,
             id_server,
             medium,
             address,
@@ -683,6 +740,7 @@ class RoomMemberHandler(BaseHandler):
         Asks an identity server for a third party invite.
 
         Args:
+            requester (Requester)
             id_server (str): hostname + optional port for the identity server.
             medium (str): The literal string "email".
             address (str): The third party address being invited.
@@ -724,24 +782,20 @@ class RoomMemberHandler(BaseHandler):
             "sender_avatar_url": inviter_avatar_url,
         }
 
-        if self.hs.config.invite_3pid_guest:
-            registration_handler = self.hs.get_handlers().registration_handler
-            guest_access_token = yield registration_handler.guest_access_token_for(
+        if self.config.invite_3pid_guest:
+            guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+                requester=requester,
                 medium=medium,
                 address=address,
                 inviter_user_id=inviter_user_id,
             )
 
-            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
-                guest_access_token
-            )
-
             invite_config.update({
                 "guest_access_token": guest_access_token,
-                "guest_user_id": guest_user_info["user"].to_string(),
+                "guest_user_id": guest_user_id,
             })
 
-        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+        data = yield self.simple_http_client.post_urlencoded_get_json(
             is_url,
             invite_config
         )
@@ -764,27 +818,6 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue((token, public_keys, fallback_public_key, display_name))
 
     @defer.inlineCallbacks
-    def forget(self, user, room_id):
-        user_id = user.to_string()
-
-        member = yield self.state_handler.get_current_state(
-            room_id=room_id,
-            event_type=EventTypes.Member,
-            state_key=user_id
-        )
-        membership = member.membership if member else None
-
-        if membership is not None and membership not in [
-            Membership.LEAVE, Membership.BAN
-        ]:
-            raise SynapseError(400, "User %s in room %s" % (
-                user_id, room_id
-            ))
-
-        if membership:
-            yield self.store.forget(user_id, room_id)
-
-    @defer.inlineCallbacks
     def _is_host_in_room(self, current_state_ids):
         # Have we just created the room, and is this about to be the very
         # first member event?
@@ -805,3 +838,94 @@ class RoomMemberHandler(BaseHandler):
                 defer.returnValue(True)
 
         defer.returnValue(False)
+
+
+class RoomMemberMasterHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberMasterHandler, self).__init__(hs)
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("user_joined_room")
+        self.distributor.declare("user_left_room")
+
+    @defer.inlineCallbacks
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Implements RoomMemberHandler._remote_join
+        """
+        if len(remote_room_hosts) == 0:
+            raise SynapseError(404, "No known servers")
+
+        # We don't do an auth check if we are doing an invite
+        # join dance for now, since we're kinda implicitly checking
+        # that we are allowed to join when we decide whether or not we
+        # need to do the invite/join dance.
+        yield self.federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user.to_string(),
+            content,
+        )
+        yield self._user_joined_room(user, room_id)
+
+    @defer.inlineCallbacks
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+        """Implements RoomMemberHandler._remote_reject_invite
+        """
+        fed_handler = self.federation_handler
+        try:
+            ret = yield fed_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                target.to_string(),
+            )
+            defer.returnValue(ret)
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                target.to_string(), room_id
+            )
+            defer.returnValue({})
+
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Implements RoomMemberHandler.get_or_register_3pid_guest
+        """
+        rg = self.registration_handler
+        return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)
+
+    def _user_joined_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_joined_room
+        """
+        return user_joined_room(self.distributor, target, room_id)
+
+    def _user_left_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_left_room
+        """
+        return user_left_room(self.distributor, target, room_id)
+
+    @defer.inlineCallbacks
+    def forget(self, user, room_id):
+        user_id = user.to_string()
+
+        member = yield self.state_handler.get_current_state(
+            room_id=room_id,
+            event_type=EventTypes.Member,
+            state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership not in [
+            Membership.LEAVE, Membership.BAN
+        ]:
+            raise SynapseError(400, "User %s in room %s" % (
+                user_id, room_id
+            ))
+
+        if membership:
+            yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
new file mode 100644
index 0000000000..493aec1e48
--- /dev/null
+++ b/synapse/handlers/room_member_worker.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.handlers.room_member import RoomMemberHandler
+from synapse.replication.http.membership import (
+    remote_join, remote_reject_invite, get_or_register_3pid_guest,
+    notify_user_membership_change,
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+class RoomMemberWorkerHandler(RoomMemberHandler):
+    @defer.inlineCallbacks
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Implements RoomMemberHandler._remote_join
+        """
+        if len(remote_room_hosts) == 0:
+            raise SynapseError(404, "No known servers")
+
+        ret = yield remote_join(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            remote_room_hosts=remote_room_hosts,
+            room_id=room_id,
+            user_id=user.to_string(),
+            content=content,
+        )
+
+        yield self._user_joined_room(user, room_id)
+
+        defer.returnValue(ret)
+
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+        """Implements RoomMemberHandler._remote_reject_invite
+        """
+        return remote_reject_invite(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            remote_room_hosts=remote_room_hosts,
+            room_id=room_id,
+            user_id=target.to_string(),
+        )
+
+    def _user_joined_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_joined_room
+        """
+        return notify_user_membership_change(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            user_id=target.to_string(),
+            room_id=room_id,
+            change="joined",
+        )
+
+    def _user_left_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_left_room
+        """
+        return notify_user_membership_change(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            user_id=target.to_string(),
+            room_id=room_id,
+            change="left",
+        )
+
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Implements RoomMemberHandler.get_or_register_3pid_guest
+        """
+        return get_or_register_3pid_guest(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            medium=medium,
+            address=address,
+            inviter_user_id=inviter_user_id,
+        )
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 44414e1dc1..e057ae54c9 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -31,7 +31,7 @@ class SetPasswordHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def set_password(self, user_id, newpassword, requester=None):
-        password_hash = self._auth_handler.hash(newpassword)
+        password_hash = yield self._auth_handler.hash(newpassword)
 
         except_device_id = requester.device_id if requester else None
         except_access_token_id = requester.access_token_id if requester else None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b12988f3c9..0f713ce038 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -235,10 +235,10 @@ class SyncHandler(object):
         defer.returnValue(rules)
 
     @defer.inlineCallbacks
-    def ephemeral_by_room(self, sync_config, now_token, since_token=None):
+    def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
         Args:
-            sync_config (SyncConfig): The flags, filters and user for the sync.
+            sync_result_builder(SyncResultBuilder)
             now_token (StreamToken): Where the server is currently up to.
             since_token (StreamToken): Where the server was when the client
                 last synced.
@@ -248,10 +248,12 @@ class SyncHandler(object):
             typing events for that room.
         """
 
+        sync_config = sync_result_builder.sync_config
+
         with Measure(self.clock, "ephemeral_by_room"):
             typing_key = since_token.typing_key if since_token else "0"
 
-            room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = sync_result_builder.joined_room_ids
 
             typing_source = self.event_sources.sources["typing"]
             typing, typing_key = yield typing_source.get_new_events(
@@ -565,10 +567,22 @@ class SyncHandler(object):
         # Always use the `now_token` in `SyncResultBuilder`
         now_token = yield self.event_sources.get_current_token()
 
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+        else:
+            joined_room_ids = yield self.get_rooms_for_user_at(
+                user_id, now_token.room_stream_id,
+            )
+
         sync_result_builder = SyncResultBuilder(
             sync_config, full_state,
             since_token=since_token,
             now_token=now_token,
+            joined_room_ids=joined_room_ids,
         )
 
         account_data_by_room = yield self._generate_sync_entry_for_account_data(
@@ -603,7 +617,6 @@ class SyncHandler(object):
         device_id = sync_config.device_id
         one_time_key_counts = {}
         if device_id:
-            user_id = sync_config.user.to_string()
             one_time_key_counts = yield self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
@@ -891,7 +904,7 @@ class SyncHandler(object):
             ephemeral_by_room = {}
         else:
             now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-                sync_result_builder.sync_config,
+                sync_result_builder,
                 now_token=sync_result_builder.now_token,
                 since_token=sync_result_builder.since_token,
             )
@@ -996,15 +1009,8 @@ class SyncHandler(object):
         if rooms_changed:
             defer.returnValue(True)
 
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
         stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
-        for room_id in joined_room_ids:
+        for room_id in sync_result_builder.joined_room_ids:
             if self.store.has_room_changed_since(room_id, stream_id):
                 defer.returnValue(True)
         defer.returnValue(False)
@@ -1028,13 +1034,6 @@ class SyncHandler(object):
 
         assert since_token
 
-        app_service = self.store.get_app_service_by_user_id(user_id)
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
-
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
             user_id, since_token.room_key, now_token.room_key
@@ -1057,7 +1056,7 @@ class SyncHandler(object):
             # we do send down the room, and with full state, where necessary
 
             old_state_ids = None
-            if room_id in joined_room_ids and non_joins:
+            if room_id in sync_result_builder.joined_room_ids and non_joins:
                 # Always include if the user (re)joined the room, especially
                 # important so that device list changes are calculated correctly.
                 # If there are non join member events, but we are still in the room,
@@ -1067,7 +1066,7 @@ class SyncHandler(object):
                 # User is in the room so we don't need to do the invite/leave checks
                 continue
 
-            if room_id in joined_room_ids or has_join:
+            if room_id in sync_result_builder.joined_room_ids or has_join:
                 old_state_ids = yield self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                 old_mem_ev = None
@@ -1079,7 +1078,7 @@ class SyncHandler(object):
                     newly_joined_rooms.append(room_id)
 
             # If user is in the room then we don't need to do the invite/leave checks
-            if room_id in joined_room_ids:
+            if room_id in sync_result_builder.joined_room_ids:
                 continue
 
             if not non_joins:
@@ -1146,7 +1145,7 @@ class SyncHandler(object):
 
         # Get all events for rooms we're currently joined to.
         room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
+            room_ids=sync_result_builder.joined_room_ids,
             from_key=since_token.room_key,
             to_key=now_token.room_key,
             limit=timeline_limit + 1,
@@ -1154,7 +1153,7 @@ class SyncHandler(object):
 
         # We loop through all room ids, even if there are no new events, in case
         # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
+        for room_id in sync_result_builder.joined_room_ids:
             room_entry = room_to_events.get(room_id, None)
 
             if room_entry:
@@ -1362,6 +1361,54 @@ class SyncHandler(object):
         else:
             raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
+    @defer.inlineCallbacks
+    def get_rooms_for_user_at(self, user_id, stream_ordering):
+        """Get set of joined rooms for a user at the given stream ordering.
+
+        The stream ordering *must* be recent, otherwise this may throw an
+        exception if older than a month. (This function is called with the
+        current token, which should be perfectly fine).
+
+        Args:
+            user_id (str)
+            stream_ordering (int)
+
+        ReturnValue:
+            Deferred[frozenset[str]]: Set of room_ids the user is in at given
+            stream_ordering.
+        """
+        joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
+            user_id,
+        )
+
+        joined_room_ids = set()
+
+        # We need to check that the stream ordering of the join for each room
+        # is before the stream_ordering asked for. This might not be the case
+        # if the user joins a room between us getting the current token and
+        # calling `get_rooms_for_user_with_stream_ordering`.
+        # If the membership's stream ordering is after the given stream
+        # ordering, we need to go and work out if the user was in the room
+        # before.
+        for room_id, membership_stream_ordering in joined_rooms:
+            if membership_stream_ordering <= stream_ordering:
+                joined_room_ids.add(room_id)
+                continue
+
+            logger.info("User joined room after current token: %s", room_id)
+
+            extrems = yield self.store.get_forward_extremeties_for_room(
+                room_id, stream_ordering,
+            )
+            users_in_room = yield self.state.get_current_user_in_room(
+                room_id, extrems,
+            )
+            if user_id in users_in_room:
+                joined_room_ids.add(room_id)
+
+        joined_room_ids = frozenset(joined_room_ids)
+        defer.returnValue(joined_room_ids)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1411,7 +1458,8 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
 
 class SyncResultBuilder(object):
     "Used to help build up a new SyncResult for a user"
-    def __init__(self, sync_config, full_state, since_token, now_token):
+    def __init__(self, sync_config, full_state, since_token, now_token,
+                 joined_room_ids):
         """
         Args:
             sync_config(SyncConfig)
@@ -1423,6 +1471,7 @@ class SyncResultBuilder(object):
         self.full_state = full_state
         self.since_token = since_token
         self.now_token = now_token
+        self.joined_room_ids = joined_room_ids
 
         self.presence = []
         self.account_data = []
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 82dedbbc99..77c0cf146f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
 
         self.federation = hs.get_federation_sender()
 
-        hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
+        hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
 
         hs.get_distributor().observe("user_left_room", self.user_left_room)