summary refs log tree commit diff
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@arasphere.net>2018-03-14 15:38:05 +0000
committerGitHub <noreply@github.com>2018-03-14 15:38:05 +0000
commit056a6df5462d45eeb4cc9870492f6147677e7213 (patch)
treea3a4c576bfd2749d297315a9e6b5cd3f53161d87
parentpep8 (diff)
parentMerge pull request #2995 from matrix-org/erikj/enable_membership_worker (diff)
downloadsynapse-056a6df5462d45eeb4cc9870492f6147677e7213.tar.xz
Merge branch 'develop' into matthew/filter_members
-rw-r--r--synapse/app/event_creator.py14
-rw-r--r--synapse/handlers/profile.py5
-rw-r--r--synapse/handlers/room_member.py287
-rw-r--r--synapse/handlers/room_member_worker.py102
-rw-r--r--synapse/replication/http/__init__.py5
-rw-r--r--synapse/replication/http/membership.py334
-rw-r--r--synapse/replication/slave/storage/profile.py21
-rw-r--r--synapse/server.py7
-rw-r--r--synapse/storage/profile.py50
-rw-r--r--synapse/storage/room.py30
10 files changed, 711 insertions, 144 deletions
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index eb593c5278..172e989b54 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -31,14 +31,20 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import RoomSendEventRestServlet
+from synapse.rest.client.v1.room import (
+    RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet,
+    JoinRoomAliasServlet,
+)
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
@@ -52,6 +58,9 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 
 class EventCreatorSlavedStore(
+    DirectoryStore,
+    TransactionStore,
+    SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,
     SlavedReceiptsStore,
@@ -85,6 +94,9 @@ class EventCreatorServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     RoomSendEventRestServlet(self).register(resource)
+                    RoomMembershipRestServlet(self).register(resource)
+                    RoomStateEventRestServlet(self).register(resource)
+                    JoinRoomAliasServlet(self).register(resource)
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index cb710fe796..3465a787ab 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -38,7 +38,10 @@ class ProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
-        self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
+        if hs.config.worker_app is None:
+            self.clock.looping_call(
+                self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+            )
 
     @defer.inlineCallbacks
     def get_profile(self, user_id):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0127cf4166..9977be8831 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import abc
 import logging
 
 from signedjson.key import decode_verify_key_bytes
@@ -31,6 +31,7 @@ from synapse.types import UserID, RoomID
 from synapse.util.async import Linearizer
 from synapse.util.distributor import user_left_room, user_joined_room
 
+
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
@@ -42,6 +43,8 @@ class RoomMemberHandler(object):
     #   API that takes ID strings and returns pagination chunks. These concerns
     #   ought to be separated out a lot better.
 
+    __metaclass__ = abc.ABCMeta
+
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
@@ -61,9 +64,87 @@ class RoomMemberHandler(object):
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
 
-        self.distributor = hs.get_distributor()
-        self.distributor.declare("user_joined_room")
-        self.distributor.declare("user_left_room")
+    @abc.abstractmethod
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Try and join a room that this server is not in
+
+        Args:
+            requester (Requester)
+            remote_room_hosts (list[str]): List of servers that can be used
+                to join via.
+            room_id (str): Room that we are trying to join
+            user (UserID): User who is trying to join
+            content (dict): A dict that should be used as the content of the
+                join event.
+
+        Returns:
+            Deferred
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+        """Attempt to reject an invite for a room this server is not in. If we
+        fail to do so we locally mark the invite as rejected.
+
+        Args:
+            requester (Requester)
+            remote_room_hosts (list[str]): List of servers to use to try and
+                reject invite
+            room_id (str)
+            target (UserID): The user rejecting the invite
+
+        Returns:
+            Deferred[dict]: A dictionary to be returned to the client, may
+            include event_id etc, or nothing if we locally rejected
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Get a guest access token for a 3PID, creating a guest account if
+        one doesn't already exist.
+
+        Args:
+            requester (Requester)
+            medium (str)
+            address (str)
+            inviter_user_id (str): The user ID who is trying to invite the
+                3PID
+
+        Returns:
+            Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+            3PID guest account.
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _user_joined_room(self, target, room_id):
+        """Notifies distributor on master process that the user has joined the
+        room.
+
+        Args:
+            target (UserID)
+            room_id (str)
+
+        Returns:
+            Deferred|None
+        """
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _user_left_room(self, target, room_id):
+        """Notifies distributor on master process that the user has left the
+        room.
+
+        Args:
+            target (UserID)
+            room_id (str)
+
+        Returns:
+            Deferred|None
+        """
+        raise NotImplementedError()
 
     @defer.inlineCallbacks
     def _local_membership_update(
@@ -127,83 +208,16 @@ class RoomMemberHandler(object):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield user_joined_room(self.distributor, target, room_id)
+                yield self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 if prev_member_event.membership == Membership.JOIN:
-                    user_left_room(self.distributor, target, room_id)
+                    yield self._user_left_room(target, room_id)
 
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def _remote_join(self, remote_room_hosts, room_id, user, content):
-        """Try and join a room that this server is not in
-
-        Args:
-            remote_room_hosts (list[str]): List of servers that can be used
-                to join via.
-            room_id (str): Room that we are trying to join
-            user (UserID): User who is trying to join
-            content (dict): A dict that should be used as the content of the
-                join event.
-
-        Returns:
-            Deferred
-        """
-        if len(remote_room_hosts) == 0:
-            raise SynapseError(404, "No known servers")
-
-        # We don't do an auth check if we are doing an invite
-        # join dance for now, since we're kinda implicitly checking
-        # that we are allowed to join when we decide whether or not we
-        # need to do the invite/join dance.
-        yield self.federation_handler.do_invite_join(
-            remote_room_hosts,
-            room_id,
-            user.to_string(),
-            content,
-        )
-        yield user_joined_room(self.distributor, user, room_id)
-
-    @defer.inlineCallbacks
-    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
-        """Attempt to reject an invite for a room this server is not in. If we
-        fail to do so we locally mark the invite as rejected.
-
-        Args:
-            remote_room_hosts (list[str]): List of servers to use to try and
-                reject invite
-            room_id (str)
-            target (UserID): The user rejecting the invite
-
-        Returns:
-            Deferred[dict]: A dictionary to be returned to the client, may
-            include event_id etc, or nothing if we locally rejected
-        """
-        fed_handler = self.federation_handler
-        try:
-            ret = yield fed_handler.do_remotely_reject_invite(
-                remote_room_hosts,
-                room_id,
-                target.to_string(),
-            )
-            defer.returnValue(ret)
-        except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
-            #
-            # The 'except' clause is very broad, but we need to
-            # capture everything from DNS failures upwards
-            #
-            logger.warn("Failed to reject invite: %s", e)
-
-            yield self.store.locally_reject_invite(
-                target.to_string(), room_id
-            )
-            defer.returnValue({})
-
-    @defer.inlineCallbacks
     def update_membership(
             self,
             requester,
@@ -356,7 +370,7 @@ class RoomMemberHandler(object):
                     content["kind"] = "guest"
 
                 ret = yield self._remote_join(
-                    remote_room_hosts, room_id, target, content
+                    requester, remote_room_hosts, room_id, target, content
                 )
                 defer.returnValue(ret)
 
@@ -378,7 +392,7 @@ class RoomMemberHandler(object):
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
                     res = yield self._remote_reject_invite(
-                        remote_room_hosts, room_id, target,
+                        requester, remote_room_hosts, room_id, target,
                     )
                     defer.returnValue(res)
 
@@ -476,12 +490,12 @@ class RoomMemberHandler(object):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield user_joined_room(self.distributor, target_user, room_id)
+                yield self._user_joined_room(target_user, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 if prev_member_event.membership == Membership.JOIN:
-                    user_left_room(self.distributor, target_user, room_id)
+                    yield self._user_left_room(target_user, room_id)
 
     @defer.inlineCallbacks
     def _can_guest_join(self, current_state_ids):
@@ -672,6 +686,7 @@ class RoomMemberHandler(object):
 
         token, public_keys, fallback_public_key, display_name = (
             yield self._ask_id_server_for_third_party_invite(
+                requester=requester,
                 id_server=id_server,
                 medium=medium,
                 address=address,
@@ -708,6 +723,7 @@ class RoomMemberHandler(object):
     @defer.inlineCallbacks
     def _ask_id_server_for_third_party_invite(
             self,
+            requester,
             id_server,
             medium,
             address,
@@ -724,6 +740,7 @@ class RoomMemberHandler(object):
         Asks an identity server for a third party invite.
 
         Args:
+            requester (Requester)
             id_server (str): hostname + optional port for the identity server.
             medium (str): The literal string "email".
             address (str): The third party address being invited.
@@ -766,8 +783,8 @@ class RoomMemberHandler(object):
         }
 
         if self.config.invite_3pid_guest:
-            rh = self.registration_handler
-            guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest(
+            guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
+                requester=requester,
                 medium=medium,
                 address=address,
                 inviter_user_id=inviter_user_id,
@@ -801,27 +818,6 @@ class RoomMemberHandler(object):
         defer.returnValue((token, public_keys, fallback_public_key, display_name))
 
     @defer.inlineCallbacks
-    def forget(self, user, room_id):
-        user_id = user.to_string()
-
-        member = yield self.state_handler.get_current_state(
-            room_id=room_id,
-            event_type=EventTypes.Member,
-            state_key=user_id
-        )
-        membership = member.membership if member else None
-
-        if membership is not None and membership not in [
-            Membership.LEAVE, Membership.BAN
-        ]:
-            raise SynapseError(400, "User %s in room %s" % (
-                user_id, room_id
-            ))
-
-        if membership:
-            yield self.store.forget(user_id, room_id)
-
-    @defer.inlineCallbacks
     def _is_host_in_room(self, current_state_ids):
         # Have we just created the room, and is this about to be the very
         # first member event?
@@ -842,3 +838,94 @@ class RoomMemberHandler(object):
                 defer.returnValue(True)
 
         defer.returnValue(False)
+
+
+class RoomMemberMasterHandler(RoomMemberHandler):
+    def __init__(self, hs):
+        super(RoomMemberMasterHandler, self).__init__(hs)
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("user_joined_room")
+        self.distributor.declare("user_left_room")
+
+    @defer.inlineCallbacks
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Implements RoomMemberHandler._remote_join
+        """
+        if len(remote_room_hosts) == 0:
+            raise SynapseError(404, "No known servers")
+
+        # We don't do an auth check if we are doing an invite
+        # join dance for now, since we're kinda implicitly checking
+        # that we are allowed to join when we decide whether or not we
+        # need to do the invite/join dance.
+        yield self.federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user.to_string(),
+            content,
+        )
+        yield self._user_joined_room(user, room_id)
+
+    @defer.inlineCallbacks
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+        """Implements RoomMemberHandler._remote_reject_invite
+        """
+        fed_handler = self.federation_handler
+        try:
+            ret = yield fed_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                target.to_string(),
+            )
+            defer.returnValue(ret)
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                target.to_string(), room_id
+            )
+            defer.returnValue({})
+
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Implements RoomMemberHandler.get_or_register_3pid_guest
+        """
+        rg = self.registration_handler
+        return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)
+
+    def _user_joined_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_joined_room
+        """
+        return user_joined_room(self.distributor, target, room_id)
+
+    def _user_left_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_left_room
+        """
+        return user_left_room(self.distributor, target, room_id)
+
+    @defer.inlineCallbacks
+    def forget(self, user, room_id):
+        user_id = user.to_string()
+
+        member = yield self.state_handler.get_current_state(
+            room_id=room_id,
+            event_type=EventTypes.Member,
+            state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership not in [
+            Membership.LEAVE, Membership.BAN
+        ]:
+            raise SynapseError(400, "User %s in room %s" % (
+                user_id, room_id
+            ))
+
+        if membership:
+            yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
new file mode 100644
index 0000000000..493aec1e48
--- /dev/null
+++ b/synapse/handlers/room_member_worker.py
@@ -0,0 +1,102 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError
+from synapse.handlers.room_member import RoomMemberHandler
+from synapse.replication.http.membership import (
+    remote_join, remote_reject_invite, get_or_register_3pid_guest,
+    notify_user_membership_change,
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+class RoomMemberWorkerHandler(RoomMemberHandler):
+    @defer.inlineCallbacks
+    def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
+        """Implements RoomMemberHandler._remote_join
+        """
+        if len(remote_room_hosts) == 0:
+            raise SynapseError(404, "No known servers")
+
+        ret = yield remote_join(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            remote_room_hosts=remote_room_hosts,
+            room_id=room_id,
+            user_id=user.to_string(),
+            content=content,
+        )
+
+        yield self._user_joined_room(user, room_id)
+
+        defer.returnValue(ret)
+
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
+        """Implements RoomMemberHandler._remote_reject_invite
+        """
+        return remote_reject_invite(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            remote_room_hosts=remote_room_hosts,
+            room_id=room_id,
+            user_id=target.to_string(),
+        )
+
+    def _user_joined_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_joined_room
+        """
+        return notify_user_membership_change(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            user_id=target.to_string(),
+            room_id=room_id,
+            change="joined",
+        )
+
+    def _user_left_room(self, target, room_id):
+        """Implements RoomMemberHandler._user_left_room
+        """
+        return notify_user_membership_change(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            user_id=target.to_string(),
+            room_id=room_id,
+            change="left",
+        )
+
+    def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
+        """Implements RoomMemberHandler.get_or_register_3pid_guest
+        """
+        return get_or_register_3pid_guest(
+            self.simple_http_client,
+            host=self.config.worker_replication_host,
+            port=self.config.worker_replication_http_port,
+            requester=requester,
+            medium=medium,
+            address=address,
+            inviter_user_id=inviter_user_id,
+        )
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index b378b41646..1d7a607529 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -13,10 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-import send_event
-
 from synapse.http.server import JsonResource
+from synapse.replication.http import membership, send_event
 
 
 REPLICATION_PREFIX = "/_synapse/replication"
@@ -29,3 +27,4 @@ class ReplicationRestResource(JsonResource):
 
     def register_servlets(self, hs):
         send_event.register_servlets(hs, self)
+        membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
new file mode 100644
index 0000000000..e66c4e881f
--- /dev/null
+++ b/synapse/replication/http/membership.py
@@ -0,0 +1,334 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, MatrixCodeMessageException
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.types import Requester, UserID
+from synapse.util.distributor import user_left_room, user_joined_room
+
+logger = logging.getLogger(__name__)
+
+
+@defer.inlineCallbacks
+def remote_join(client, host, port, requester, remote_room_hosts,
+                room_id, user_id, content):
+    """Ask the master to do a remote join for the given user to the given room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and join via
+        room_id (str)
+        user_id (str)
+        content (dict): The event content to use for the join event
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+        "content": content,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def remote_reject_invite(client, host, port, requester, remote_room_hosts,
+                         room_id, user_id):
+    """Ask master to reject the invite for the user and room.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        remote_room_hosts (list[str]): Servers to try and reject via
+        room_id (str)
+        user_id (str)
+
+    Returns:
+        Deferred
+    """
+    uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "remote_room_hosts": remote_room_hosts,
+        "room_id": room_id,
+        "user_id": user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def get_or_register_3pid_guest(client, host, port, requester,
+                               medium, address, inviter_user_id):
+    """Ask the master to get/create a guest account for given 3PID.
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        medium (str)
+        address (str)
+        inviter_user_id (str): The user ID who is trying to invite the
+            3PID
+
+    Returns:
+        Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+        3PID guest account.
+    """
+
+    uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
+
+    payload = {
+        "requester": requester.serialize(),
+        "medium": medium,
+        "address": address,
+        "inviter_user_id": inviter_user_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+@defer.inlineCallbacks
+def notify_user_membership_change(client, host, port, user_id, room_id, change):
+    """Notify master that a user has joined or left the room
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication.
+        user_id (str)
+        room_id (str)
+        change (str): Either "join" or "left"
+
+    Returns:
+        Deferred
+    """
+    assert change in ("joined", "left")
+
+    uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
+
+    payload = {
+        "user_id": user_id,
+        "room_id": room_id,
+    }
+
+    try:
+        result = yield client.post_json_get_json(uri, payload)
+    except MatrixCodeMessageException as e:
+        # We convert to SynapseError as we know that it was a SynapseError
+        # on the master process that we should send to the client. (And
+        # importantly, not stack traces everywhere)
+        raise SynapseError(e.code, e.msg, e.errcode)
+    defer.returnValue(result)
+
+
+class ReplicationRemoteJoinRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteJoinRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+        event_content = content["content"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_join: %s into room: %s",
+            user_id, room_id,
+        )
+
+        yield self.federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user_id,
+            event_content,
+        )
+
+        defer.returnValue((200, {}))
+
+
+class ReplicationRemoteRejectInviteRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+
+    def __init__(self, hs):
+        super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+
+        self.federation_handler = hs.get_handlers().federation_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        remote_room_hosts = content["remote_room_hosts"]
+        room_id = content["room_id"]
+        user_id = content["user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "remote_reject_invite: %s out of room: %s",
+            user_id, room_id,
+        )
+
+        try:
+            event = yield self.federation_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                user_id,
+            )
+            ret = event.get_pdu_json()
+        except Exception as e:
+            # if we were unable to reject the exception, just mark
+            # it as rejected on our end and plough ahead.
+            #
+            # The 'except' clause is very broad, but we need to
+            # capture everything from DNS failures upwards
+            #
+            logger.warn("Failed to reject invite: %s", e)
+
+            yield self.store.locally_reject_invite(
+                user_id, room_id
+            )
+            ret = {}
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationRegister3PIDGuestRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+
+    def __init__(self, hs):
+        super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        medium = content["medium"]
+        address = content["address"]
+        inviter_user_id = content["inviter_user_id"]
+
+        requester = Requester.deserialize(self.store, content["requester"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info("get_or_register_3pid_guest: %r", content)
+
+        ret = yield self.registeration_handler.get_or_register_3pid_guest(
+            medium, address, inviter_user_id,
+        )
+
+        defer.returnValue((200, ret))
+
+
+class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
+    PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+
+    def __init__(self, hs):
+        super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+
+        self.registeration_handler = hs.get_handlers().registration_handler
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.distributor = hs.get_distributor()
+
+    def on_POST(self, request, change):
+        content = parse_json_object_from_request(request)
+
+        user_id = content["user_id"]
+        room_id = content["room_id"]
+
+        logger.info("user membership change: %s in %s", user_id, room_id)
+
+        user = UserID.from_string(user_id)
+
+        if change == "joined":
+            user_joined_room(self.distributor, user, room_id)
+        elif change == "left":
+            user_left_room(self.distributor, user, room_id)
+        else:
+            raise Exception("Unrecognized change: %r", change)
+
+        return (200, {})
+
+
+def register_servlets(hs, http_server):
+    ReplicationRemoteJoinRestServlet(hs).register(http_server)
+    ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
+    ReplicationRegister3PIDGuestRestServlet(hs).register(http_server)
+    ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
new file mode 100644
index 0000000000..46c28d4171
--- /dev/null
+++ b/synapse/replication/slave/storage/profile.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.storage.profile import ProfileWorkerStore
+
+
+class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+    pass
diff --git a/synapse/server.py b/synapse/server.py
index 43c6e0a6d6..cd0c1a51be 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -47,7 +47,8 @@ from synapse.handlers.device import DeviceHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.room_list import RoomListHandler
-from synapse.handlers.room_member import RoomMemberHandler
+from synapse.handlers.room_member import RoomMemberMasterHandler
+from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
@@ -392,7 +393,9 @@ class HomeServer(object):
         return SpamChecker(self)
 
     def build_room_member_handler(self):
-        return RoomMemberHandler(self)
+        if self.config.worker_app:
+            return RoomMemberWorkerHandler(self)
+        return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
         return FederationHandlerRegistry()
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index ec02e73bc2..8612bd5ecc 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -21,14 +21,7 @@ from synapse.api.errors import StoreError
 from ._base import SQLBaseStore
 
 
-class ProfileStore(SQLBaseStore):
-    def create_profile(self, user_localpart):
-        return self._simple_insert(
-            table="profiles",
-            values={"user_id": user_localpart},
-            desc="create_profile",
-        )
-
+class ProfileWorkerStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_profileinfo(self, user_localpart):
         try:
@@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore):
             desc="get_profile_displayname",
         )
 
-    def set_profile_displayname(self, user_localpart, new_displayname):
-        return self._simple_update_one(
-            table="profiles",
-            keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
-            desc="set_profile_displayname",
-        )
-
     def get_profile_avatar_url(self, user_localpart):
         return self._simple_select_one_onecol(
             table="profiles",
@@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore):
             desc="get_profile_avatar_url",
         )
 
-    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
-        return self._simple_update_one(
-            table="profiles",
-            keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
-            desc="set_profile_avatar_url",
-        )
-
     def get_from_remote_profile_cache(self, user_id):
         return self._simple_select_one(
             table="remote_profile_cache",
@@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore):
             desc="get_from_remote_profile_cache",
         )
 
+
+class ProfileStore(ProfileWorkerStore):
+    def create_profile(self, user_localpart):
+        return self._simple_insert(
+            table="profiles",
+            values={"user_id": user_localpart},
+            desc="create_profile",
+        )
+
+    def set_profile_displayname(self, user_localpart, new_displayname):
+        return self._simple_update_one(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            updatevalues={"displayname": new_displayname},
+            desc="set_profile_displayname",
+        )
+
+    def set_profile_avatar_url(self, user_localpart, new_avatar_url):
+        return self._simple_update_one(
+            table="profiles",
+            keyvalues={"user_id": user_localpart},
+            updatevalues={"avatar_url": new_avatar_url},
+            desc="set_profile_avatar_url",
+        )
+
     def add_remote_profile_cache(self, user_id, displayname, avatar_url):
         """Ensure we are caching the remote user's profiles.
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 7f2c08d7a6..34ed84ea22 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -157,6 +157,18 @@ class RoomWorkerStore(SQLBaseStore):
             "get_public_room_changes", get_public_room_changes_txn
         )
 
+    @cached(max_entries=10000)
+    def is_room_blocked(self, room_id):
+        return self._simple_select_one_onecol(
+            table="blocked_rooms",
+            keyvalues={
+                "room_id": room_id,
+            },
+            retcol="1",
+            allow_none=True,
+            desc="is_room_blocked",
+        )
+
 
 class RoomStore(RoomWorkerStore, SearchStore):
 
@@ -485,18 +497,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
         else:
             defer.returnValue(None)
 
-    @cached(max_entries=10000)
-    def is_room_blocked(self, room_id):
-        return self._simple_select_one_onecol(
-            table="blocked_rooms",
-            keyvalues={
-                "room_id": room_id,
-            },
-            retcol="1",
-            allow_none=True,
-            desc="is_room_blocked",
-        )
-
     @defer.inlineCallbacks
     def block_room(self, room_id, user_id):
         yield self._simple_insert(
@@ -507,7 +507,11 @@ class RoomStore(RoomWorkerStore, SearchStore):
             },
             desc="block_room",
         )
-        self.is_room_blocked.invalidate((room_id,))
+        yield self.runInteraction(
+            "block_room_invalidation",
+            self._invalidate_cache_and_stream,
+            self.is_room_blocked, (room_id,),
+        )
 
     def get_media_mxcs_in_room(self, room_id):
         """Retrieves all the local and remote media MXC URIs in a given room