summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py5
-rw-r--r--synapse/handlers/auth.py99
-rw-r--r--synapse/handlers/deactivate_account.py1
-rw-r--r--synapse/handlers/device.py392
-rw-r--r--synapse/handlers/directory.py8
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/handlers/federation.py140
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py27
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/profile.py10
-rw-r--r--synapse/handlers/receipts.py114
-rw-r--r--synapse/handlers/register.py50
-rw-r--r--synapse/handlers/room_list.py14
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/handlers/state_deltas.py70
-rw-r--r--synapse/handlers/sync.py86
-rw-r--r--synapse/handlers/typing.py2
-rw-r--r--synapse/handlers/user_directory.py423
19 files changed, 804 insertions, 666 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 594754cfd8..ac09d03ba9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -93,9 +93,9 @@ class BaseHandler(object):
             messages_per_second = self.hs.config.rc_messages_per_second
             burst_count = self.hs.config.rc_message_burst_count
 
-        allowed, time_allowed = self.ratelimiter.send_message(
+        allowed, time_allowed = self.ratelimiter.can_do_action(
             user_id, time_now,
-            msg_rate_hz=messages_per_second,
+            rate_hz=messages_per_second,
             burst_count=burst_count,
             update=update,
         )
@@ -165,6 +165,7 @@ class BaseHandler(object):
                     member_event.room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception as e:
                 logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..4544de821d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
 from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
+        self._account_ratelimiter = Ratelimiter()
+        self._failed_attempts_ratelimiter = Ratelimiter()
+
+        self._clock = self.hs.get_clock()
+
     @defer.inlineCallbacks
     def validate_user_via_ui_auth(self, requester, request_body, clientip):
         """
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
         Returns:
             defer.Deferred: (unicode) canonical_user_id, or None if zero or
             multiple matches
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
+        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
 
         if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
                 username, self.hs.hostname
             ).to_string()
 
+        self.ratelimit_login_per_account(qualified_user_id)
+
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -715,15 +730,58 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password. We raise a 403 here, but note
-        # that if we're doing user-interactive login, it turns all LoginErrors
-        # into a 401 anyway.
+        # unknown username or invalid password.
+        self._failed_attempts_ratelimiter.ratelimit(
+            qualified_user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=True,
+        )
+
+        # We raise a 403 here, but note that if we're doing user-interactive
+        # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(
             403, "Invalid password",
             errcode=Codes.FORBIDDEN
         )
 
     @defer.inlineCallbacks
+    def check_password_provider_3pid(self, medium, address, password):
+        """Check if a password provider is able to validate a thirdparty login
+
+        Args:
+            medium (str): The medium of the 3pid (ex. email).
+            address (str): The address of the 3pid (ex. jdoe@example.com).
+            password (str): The password of the user.
+
+        Returns:
+            Deferred[(str|None, func|None)]: A tuple of `(user_id,
+            callback)`. If authentication is successful, `user_id` is a `str`
+            containing the authenticated, canonical user ID. `callback` is
+            then either a function to be later run after the server has
+            completed login/registration, or `None`. If authentication was
+            unsuccessful, `user_id` and `callback` are both `None`.
+        """
+        for provider in self.password_providers:
+            if hasattr(provider, "check_3pid_auth"):
+                # This function is able to return a deferred that either
+                # resolves None, meaning authentication failure, or upon
+                # success, to a str (which is the user_id) or a tuple of
+                # (user_id, callback_func), where callback_func should be run
+                # after we've finished everything else
+                result = yield provider.check_3pid_auth(
+                    medium, address, password,
+                )
+                if result:
+                    # Check if the return value is a str or a tuple
+                    if isinstance(result, str):
+                        # If it's a str, set callback function to None
+                        result = (result, None)
+                    defer.returnValue(result)
+
+        defer.returnValue((None, None))
+
+    @defer.inlineCallbacks
     def _check_local_password(self, user_id, password):
         """Authenticate a user against the local password database.
 
@@ -734,7 +792,12 @@ class AuthHandler(BaseHandler):
             user_id (unicode): complete @user:id
             password (unicode): the provided password
         Returns:
-            (unicode) the canonical_user_id, or None if unknown user / bad password
+            Deferred[unicode] the canonical_user_id, or Deferred[None] if
+                unknown user/bad password
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -763,6 +826,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        self.ratelimit_login_per_account(user_id)
         yield self.auth.check_auth_blocking(user_id)
         defer.returnValue(user_id)
 
@@ -934,6 +998,33 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
+    def ratelimit_login_per_account(self, user_id):
+        """Checks whether the process must be stopped because of ratelimiting.
+
+        Checks against two ratelimiters: the generic one for login attempts per
+        account and the one specific to failed attempts.
+
+        Args:
+            user_id (unicode): complete @user:id
+
+        Raises:
+            LimitExceededError if one of the ratelimiters' login requests count
+                for this user is too high too proceed.
+        """
+        self._failed_attempts_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
+        self._account_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_account.per_second,
+            burst_count=self.hs.config.rc_login_account.burst_count,
+            update=True,
+        )
+
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 75fe50c42c..97d3f31d98 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
                     room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception:
                 logger.exception(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c708c35d4d..b398848079 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,13 +37,185 @@ from ._base import BaseHandler
 logger = logging.getLogger(__name__)
 
 
-class DeviceHandler(BaseHandler):
+class DeviceWorkerHandler(BaseHandler):
     def __init__(self, hs):
-        super(DeviceHandler, self).__init__(hs)
+        super(DeviceWorkerHandler, self).__init__(hs)
 
         self.hs = hs
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
+
+    @defer.inlineCallbacks
+    def get_devices_by_user(self, user_id):
+        """
+        Retrieve the given user's devices
+
+        Args:
+            user_id (str):
+        Returns:
+            defer.Deferred: list[dict[str, X]]: info on each device
+        """
+
+        device_map = yield self.store.get_devices_by_user(user_id)
+
+        ips = yield self.store.get_last_client_ip_by_device(
+            user_id, device_id=None
+        )
+
+        devices = list(device_map.values())
+        for device in devices:
+            _update_device_from_client_ips(device, ips)
+
+        defer.returnValue(devices)
+
+    @defer.inlineCallbacks
+    def get_device(self, user_id, device_id):
+        """ Retrieve the given device
+
+        Args:
+            user_id (str):
+            device_id (str):
+
+        Returns:
+            defer.Deferred: dict[str, X]: info on the device
+        Raises:
+            errors.NotFoundError: if the device was not found
+        """
+        try:
+            device = yield self.store.get_device(user_id, device_id)
+        except errors.StoreError:
+            raise errors.NotFoundError
+        ips = yield self.store.get_last_client_ip_by_device(
+            user_id, device_id,
+        )
+        _update_device_from_client_ips(device, ips)
+        defer.returnValue(device)
+
+    @measure_func("device.get_user_ids_changed")
+    @defer.inlineCallbacks
+    def get_user_ids_changed(self, user_id, from_token):
+        """Get list of users that have had the devices updated, or have newly
+        joined a room, that `user_id` may be interested in.
+
+        Args:
+            user_id (str)
+            from_token (StreamToken)
+        """
+        now_room_key = yield self.store.get_room_events_max_id()
+
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+
+        # First we check if any devices have changed
+        changed = yield self.store.get_user_whose_devices_changed(
+            from_token.device_list_key
+        )
+
+        # Then work out if any users have since joined
+        rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
+
+        member_events = yield self.store.get_membership_changes_for_user(
+            user_id, from_token.room_key, now_room_key,
+        )
+        rooms_changed.update(event.room_id for event in member_events)
+
+        stream_ordering = RoomStreamToken.parse_stream_token(
+            from_token.room_key
+        ).stream
+
+        possibly_changed = set(changed)
+        possibly_left = set()
+        for room_id in rooms_changed:
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+            # The user may have left the room
+            # TODO: Check if they actually did or if we were just invited.
+            if room_id not in room_ids:
+                for key, event_id in iteritems(current_state_ids):
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_left.add(state_key)
+                continue
+
+            # Fetch the current state at the time.
+            try:
+                event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_ordering=stream_ordering
+                )
+            except errors.StoreError:
+                # we have purged the stream_ordering index since the stream
+                # ordering: treat it the same as a new room
+                event_ids = []
+
+            # special-case for an empty prev state: include all members
+            # in the changed list
+            if not event_ids:
+                for key, event_id in iteritems(current_state_ids):
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_changed.add(state_key)
+                continue
+
+            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+            if not current_member_id:
+                continue
+
+            # mapping from event_id -> state_dict
+            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
+            # Check if we've joined the room? If so we just blindly add all the users to
+            # the "possibly changed" users.
+            for state_dict in itervalues(prev_state_ids):
+                member_event = state_dict.get((EventTypes.Member, user_id), None)
+                if not member_event or member_event != current_member_id:
+                    for key, event_id in iteritems(current_state_ids):
+                        etype, state_key = key
+                        if etype != EventTypes.Member:
+                            continue
+                        possibly_changed.add(state_key)
+                    break
+
+            # If there has been any change in membership, include them in the
+            # possibly changed list. We'll check if they are joined below,
+            # and we're not toooo worried about spuriously adding users.
+            for key, event_id in iteritems(current_state_ids):
+                etype, state_key = key
+                if etype != EventTypes.Member:
+                    continue
+
+                # check if this member has changed since any of the extremities
+                # at the stream_ordering, and add them to the list if so.
+                for state_dict in itervalues(prev_state_ids):
+                    prev_event_id = state_dict.get(key, None)
+                    if not prev_event_id or prev_event_id != event_id:
+                        if state_key != user_id:
+                            possibly_changed.add(state_key)
+                        break
+
+        if possibly_changed or possibly_left:
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
+
+            # Take the intersection of the users whose devices may have changed
+            # and those that actually still share a room with the user
+            possibly_joined = possibly_changed & users_who_share_room
+            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+        else:
+            possibly_joined = []
+            possibly_left = []
+
+        defer.returnValue({
+            "changed": list(possibly_joined),
+            "left": list(possibly_left),
+        })
+
+
+class DeviceHandler(DeviceWorkerHandler):
+    def __init__(self, hs):
+        super(DeviceHandler, self).__init__(hs)
+
         self.federation_sender = hs.get_federation_sender()
 
         self._edu_updater = DeviceListEduUpdater(hs, self)
@@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler):
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
     @defer.inlineCallbacks
-    def get_devices_by_user(self, user_id):
-        """
-        Retrieve the given user's devices
-
-        Args:
-            user_id (str):
-        Returns:
-            defer.Deferred: list[dict[str, X]]: info on each device
-        """
-
-        device_map = yield self.store.get_devices_by_user(user_id)
-
-        ips = yield self.store.get_last_client_ip_by_device(
-            user_id, device_id=None
-        )
-
-        devices = list(device_map.values())
-        for device in devices:
-            _update_device_from_client_ips(device, ips)
-
-        defer.returnValue(devices)
-
-    @defer.inlineCallbacks
-    def get_device(self, user_id, device_id):
-        """ Retrieve the given device
-
-        Args:
-            user_id (str):
-            device_id (str):
-
-        Returns:
-            defer.Deferred: dict[str, X]: info on the device
-        Raises:
-            errors.NotFoundError: if the device was not found
-        """
-        try:
-            device = yield self.store.get_device(user_id, device_id)
-        except errors.StoreError:
-            raise errors.NotFoundError
-        ips = yield self.store.get_last_client_ip_by_device(
-            user_id, device_id,
-        )
-        _update_device_from_client_ips(device, ips)
-        defer.returnValue(device)
-
-    @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """ Delete the given device
 
@@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler):
             user_id, device_ids, list(hosts)
         )
 
+        for device_id in device_ids:
+            logger.debug(
+                "Notifying about update %r/%r, ID: %r", user_id, device_id,
+                position,
+            )
+
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
         yield self.notifier.on_new_event(
@@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler):
         )
 
         if hosts:
-            logger.info("Sending device list update notif to: %r", hosts)
+            logger.info("Sending device list update notif for %r to: %r", user_id, hosts)
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
 
-    @measure_func("device.get_user_ids_changed")
-    @defer.inlineCallbacks
-    def get_user_ids_changed(self, user_id, from_token):
-        """Get list of users that have had the devices updated, or have newly
-        joined a room, that `user_id` may be interested in.
-
-        Args:
-            user_id (str)
-            from_token (StreamToken)
-        """
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
-        room_ids = yield self.store.get_rooms_for_user(user_id)
-
-        # First we check if any devices have changed
-        changed = yield self.store.get_user_whose_devices_changed(
-            from_token.device_list_key
-        )
-
-        # Then work out if any users have since joined
-        rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
-
-        member_events = yield self.store.get_membership_changes_for_user(
-            user_id, from_token.room_key, now_token.room_key
-        )
-        rooms_changed.update(event.room_id for event in member_events)
-
-        stream_ordering = RoomStreamToken.parse_stream_token(
-            from_token.room_key
-        ).stream
-
-        possibly_changed = set(changed)
-        possibly_left = set()
-        for room_id in rooms_changed:
-            current_state_ids = yield self.store.get_current_state_ids(room_id)
-
-            # The user may have left the room
-            # TODO: Check if they actually did or if we were just invited.
-            if room_id not in room_ids:
-                for key, event_id in iteritems(current_state_ids):
-                    etype, state_key = key
-                    if etype != EventTypes.Member:
-                        continue
-                    possibly_left.add(state_key)
-                continue
-
-            # Fetch the current state at the time.
-            try:
-                event_ids = yield self.store.get_forward_extremeties_for_room(
-                    room_id, stream_ordering=stream_ordering
-                )
-            except errors.StoreError:
-                # we have purged the stream_ordering index since the stream
-                # ordering: treat it the same as a new room
-                event_ids = []
-
-            # special-case for an empty prev state: include all members
-            # in the changed list
-            if not event_ids:
-                for key, event_id in iteritems(current_state_ids):
-                    etype, state_key = key
-                    if etype != EventTypes.Member:
-                        continue
-                    possibly_changed.add(state_key)
-                continue
-
-            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
-            if not current_member_id:
-                continue
-
-            # mapping from event_id -> state_dict
-            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
-
-            # Check if we've joined the room? If so we just blindly add all the users to
-            # the "possibly changed" users.
-            for state_dict in itervalues(prev_state_ids):
-                member_event = state_dict.get((EventTypes.Member, user_id), None)
-                if not member_event or member_event != current_member_id:
-                    for key, event_id in iteritems(current_state_ids):
-                        etype, state_key = key
-                        if etype != EventTypes.Member:
-                            continue
-                        possibly_changed.add(state_key)
-                    break
-
-            # If there has been any change in membership, include them in the
-            # possibly changed list. We'll check if they are joined below,
-            # and we're not toooo worried about spuriously adding users.
-            for key, event_id in iteritems(current_state_ids):
-                etype, state_key = key
-                if etype != EventTypes.Member:
-                    continue
-
-                # check if this member has changed since any of the extremities
-                # at the stream_ordering, and add them to the list if so.
-                for state_dict in itervalues(prev_state_ids):
-                    prev_event_id = state_dict.get(key, None)
-                    if not prev_event_id or prev_event_id != event_id:
-                        if state_key != user_id:
-                            possibly_changed.add(state_key)
-                        break
-
-        if possibly_changed or possibly_left:
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-
-            # Take the intersection of the users whose devices may have changed
-            # and those that actually still share a room with the user
-            possibly_joined = possibly_changed & users_who_share_room
-            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
-        else:
-            possibly_joined = []
-            possibly_left = []
-
-        defer.returnValue({
-            "changed": list(possibly_joined),
-            "left": list(possibly_left),
-        })
-
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
         stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@@ -473,15 +485,26 @@ class DeviceListEduUpdater(object):
 
         if get_domain_from_id(user_id) != origin:
             # TODO: Raise?
-            logger.warning("Got device list update edu for %r from %r", user_id, origin)
+            logger.warning(
+                "Got device list update edu for %r/%r from %r",
+                user_id, device_id, origin,
+            )
             return
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
+            logger.warning(
+                "Got device list update edu for %r/%r, but don't share a room",
+                user_id, device_id,
+            )
             return
 
+        logger.debug(
+            "Received device list update for %r/%r", user_id, device_id,
+        )
+
         self._pending_updates.setdefault(user_id, []).append(
             (device_id, stream_id, prev_ids, edu_content)
         )
@@ -499,10 +522,18 @@ class DeviceListEduUpdater(object):
                 # This can happen since we batch updates
                 return
 
+            for device_id, stream_id, prev_ids, content in pending_updates:
+                logger.debug(
+                    "Handling update %r/%r, ID: %r, prev: %r ",
+                    user_id, device_id, stream_id, prev_ids,
+                )
+
             # Given a list of updates we check if we need to resync. This
             # happens if we've missed updates.
             resync = yield self._need_to_do_resync(user_id, pending_updates)
 
+            logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+
             if resync:
                 # Fetch all devices for the user.
                 origin = get_domain_from_id(user_id)
@@ -555,11 +586,21 @@ class DeviceListEduUpdater(object):
                     )
                     devices = []
 
+                for device in devices:
+                    logger.debug(
+                        "Handling resync update %r/%r, ID: %r",
+                        user_id, device["device_id"], stream_id,
+                    )
+
                 yield self.store.update_remote_device_list_cache(
                     user_id, devices, stream_id,
                 )
                 device_ids = [device["device_id"] for device in devices]
                 yield self.device_handler.notify_device_update(user_id, device_ids)
+
+                # We clobber the seen updates since we've re-synced from a given
+                # point.
+                self._seen_updates[user_id] = set([stream_id])
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
@@ -572,9 +613,9 @@ class DeviceListEduUpdater(object):
                     user_id, [device_id for device_id, _, _, _ in pending_updates]
                 )
 
-            self._seen_updates.setdefault(user_id, set()).update(
-                stream_id for _, stream_id, _, _ in pending_updates
-            )
+                self._seen_updates.setdefault(user_id, set()).update(
+                    stream_id for _, stream_id, _, _ in pending_updates
+                )
 
     @defer.inlineCallbacks
     def _need_to_do_resync(self, user_id, updates):
@@ -587,6 +628,11 @@ class DeviceListEduUpdater(object):
             user_id
         )
 
+        logger.debug(
+            "Current extremity for %r: %r",
+            user_id, extremity,
+        )
+
         stream_id_in_updates = set()  # stream_ids in updates list
         for _, stream_id, prev_ids, _ in updates:
             if not prev_ids:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8b113307d2..fe128d9c88 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.config = hs.config
+        self.enable_room_list_search = hs.config.enable_room_list_search
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
         if visibility not in ["public", "private"]:
             raise SynapseError(400, "Invalid visibility setting")
 
+        if visibility == "public" and not self.enable_room_list_search:
+            # The room list has been disabled.
+            raise AuthError(
+                403,
+                "This user is not permitted to publish rooms to the room list"
+            )
+
         room = yield self.store.get_room(room_id)
         if room is None:
             raise SynapseError(400, "Unknown room")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f772e62c28..d883e98381 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,7 +19,7 @@ import random
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.types import UserID
@@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
 
+        if room_id:
+            blocked = yield self.store.is_room_blocked(room_id)
+            if blocked:
+                raise SynapseError(403, "This room has been blocked on this server")
+
         # send any outstanding server notices to the user.
         yield self._server_notices_sender.on_user_syncing(auth_user_id)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f80486102a..9eaf2d3e18 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.crypto.event_signing import compute_event_signature
+from synapse.event_auth import auth_types_for_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -858,6 +859,52 @@ class FederationHandler(BaseHandler):
             logger.debug("Not backfilling as no extremeties found.")
             return
 
+        # We only want to paginate if we can actually see the events we'll get,
+        # as otherwise we'll just spend a lot of resources to get redacted
+        # events.
+        #
+        # We do this by filtering all the backwards extremities and seeing if
+        # any remain. Given we don't have the extremity events themselves, we
+        # need to actually check the events that reference them.
+        #
+        # *Note*: the spec wants us to keep backfilling until we reach the start
+        # of the room in case we are allowed to see some of the history. However
+        # in practice that causes more issues than its worth, as a) its
+        # relatively rare for there to be any visible history and b) even when
+        # there is its often sufficiently long ago that clients would stop
+        # attempting to paginate before backfill reached the visible history.
+        #
+        # TODO: If we do do a backfill then we should filter the backwards
+        #   extremities to only include those that point to visible portions of
+        #   history.
+        #
+        # TODO: Correctly handle the case where we are allowed to see the
+        #   forward event but not the backward extremity, e.g. in the case of
+        #   initial join of the server where we are allowed to see the join
+        #   event but not anything before it. This would require looking at the
+        #   state *before* the event, ignoring the special casing certain event
+        #   types have.
+
+        forward_events = yield self.store.get_successor_events(
+            list(extremities),
+        )
+
+        extremities_events = yield self.store.get_events(
+            forward_events,
+            check_redacted=False,
+            get_prev_content=False,
+        )
+
+        # We set `check_history_visibility_only` as we might otherwise get false
+        # positives from users having been erased.
+        filtered_extremities = yield filter_events_for_server(
+            self.store, self.server_name, list(extremities_events.values()),
+            redact=False, check_history_visibility_only=True,
+        )
+
+        if not filtered_extremities:
+            defer.returnValue(False)
+
         # Check if we reached a point where we should start backfilling.
         sorted_extremeties_tuple = sorted(
             extremities.items(),
@@ -1582,6 +1629,7 @@ class FederationHandler(BaseHandler):
             origin, event,
             state=state,
             auth_events=auth_events,
+            backfilled=backfilled,
         )
 
         # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
@@ -1626,6 +1674,7 @@ class FederationHandler(BaseHandler):
                     event,
                     state=ev_info.get("state"),
                     auth_events=ev_info.get("auth_events"),
+                    backfilled=backfilled,
                 )
             defer.returnValue(res)
 
@@ -1748,7 +1797,7 @@ class FederationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _prep_event(self, origin, event, state=None, auth_events=None):
+    def _prep_event(self, origin, event, state, auth_events, backfilled):
         """
 
         Args:
@@ -1756,6 +1805,7 @@ class FederationHandler(BaseHandler):
             event:
             state:
             auth_events:
+            backfilled (bool)
 
         Returns:
             Deferred, which resolves to synapse.events.snapshot.EventContext
@@ -1797,12 +1847,100 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
+        if not context.rejected:
+            yield self._check_for_soft_fail(event, state, backfilled)
+
         if event.type == EventTypes.GuestAccess and not context.rejected:
             yield self.maybe_kick_guest_users(event)
 
         defer.returnValue(context)
 
     @defer.inlineCallbacks
+    def _check_for_soft_fail(self, event, state, backfilled):
+        """Checks if we should soft fail the event, if so marks the event as
+        such.
+
+        Args:
+            event (FrozenEvent)
+            state (dict|None): The state at the event if we don't have all the
+                event's prev events
+            backfilled (bool): Whether the event is from backfill
+
+        Returns:
+            Deferred
+        """
+        # For new (non-backfilled and non-outlier) events we check if the event
+        # passes auth based on the current state. If it doesn't then we
+        # "soft-fail" the event.
+        do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier()
+        if do_soft_fail_check:
+            extrem_ids = yield self.store.get_latest_event_ids_in_room(
+                event.room_id,
+            )
+
+            extrem_ids = set(extrem_ids)
+            prev_event_ids = set(event.prev_event_ids())
+
+            if extrem_ids == prev_event_ids:
+                # If they're the same then the current state is the same as the
+                # state at the event, so no point rechecking auth for soft fail.
+                do_soft_fail_check = False
+
+        if do_soft_fail_check:
+            room_version = yield self.store.get_room_version(event.room_id)
+
+            # Calculate the "current state".
+            if state is not None:
+                # If we're explicitly given the state then we won't have all the
+                # prev events, and so we have a gap in the graph. In this case
+                # we want to be a little careful as we might have been down for
+                # a while and have an incorrect view of the current state,
+                # however we still want to do checks as gaps are easy to
+                # maliciously manufacture.
+                #
+                # So we use a "current state" that is actually a state
+                # resolution across the current forward extremities and the
+                # given state at the event. This should correctly handle cases
+                # like bans, especially with state res v2.
+
+                state_sets = yield self.store.get_state_groups(
+                    event.room_id, extrem_ids,
+                )
+                state_sets = list(state_sets.values())
+                state_sets.append(state)
+                current_state_ids = yield self.state_handler.resolve_events(
+                    room_version, state_sets, event,
+                )
+                current_state_ids = {
+                    k: e.event_id for k, e in iteritems(current_state_ids)
+                }
+            else:
+                current_state_ids = yield self.state_handler.get_current_state_ids(
+                    event.room_id, latest_event_ids=extrem_ids,
+                )
+
+            # Now check if event pass auth against said current state
+            auth_types = auth_types_for_event(event)
+            current_state_ids = [
+                e for k, e in iteritems(current_state_ids)
+                if k in auth_types
+            ]
+
+            current_auth_events = yield self.store.get_events(current_state_ids)
+            current_auth_events = {
+                (e.type, e.state_key): e for e in current_auth_events.values()
+            }
+
+            try:
+                self.auth.check(room_version, event, auth_events=current_auth_events)
+            except AuthError as e:
+                logger.warn(
+                    "Failed current state auth resolution for %r because %s",
+                    event, e,
+                )
+                event.internal_metadata.soft_failed = True
+
+    @defer.inlineCallbacks
     def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects,
                       missing):
         in_room = yield self.auth.check_host_in_room(
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 563bb3cea3..7dfae78db0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,7 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
@@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
             A JSON serialisable dict with the snapshot of the room.
         """
 
+        blocked = yield self.store.is_room_blocked(room_id)
+        if blocked:
+            raise SynapseError(403, "This room has been blocked on this server")
+
         user_id = requester.user.to_string()
 
         membership, member_event_id = yield self._check_in_room_or_world_readable(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3981fe69ce..9b41c7b205 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,12 +243,19 @@ class EventCreationHandler(object):
 
         self.spam_checker = hs.get_spam_checker()
 
-        if self.config.block_events_without_consent_error is not None:
+        self._block_events_without_consent_error = (
+            self.config.block_events_without_consent_error
+        )
+
+        # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+        # config options, but *only* if we have a configuration for which we are
+        # going to need it.
+        if self._block_events_without_consent_error:
             self._consent_uri_builder = ConsentURIBuilder(self.config)
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_events_and_hashes=None):
+                     prev_events_and_hashes=None, require_consent=True):
         """
         Given a dict from a client, create a new event.
 
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
+
+            require_consent (bool): Whether to check if the requester has
+                consented to privacy policy.
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
                     )
 
         is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
-        if not is_exempt:
+        if require_consent and not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
         if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
         Raises:
             ConsentNotGivenError: if the user has not given consent yet
         """
-        if self.config.block_events_without_consent_error is None:
+        if self._block_events_without_consent_error is None:
             return
 
         # exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
         consent_uri = self._consent_uri_builder.build_user_consent_uri(
             requester.user.localpart,
         )
-        msg = self.config.block_events_without_consent_error % {
+        msg = self._block_events_without_consent_error % {
             'consent_uri': consent_uri,
         }
         raise ConsentNotGivenError(
@@ -436,10 +446,11 @@ class EventCreationHandler(object):
 
         if event.is_state():
             prev_state = yield self.deduplicate_state_event(event, context)
-            logger.info(
-                "Not bothering to persist duplicate state event %s", event.event_id,
-            )
             if prev_state is not None:
+                logger.info(
+                    "Not bothering to persist state event %s duplicated by %s",
+                    event.event_id, prev_state.event_id,
+                )
                 defer.returnValue(prev_state)
 
         yield self.handle_new_client_event(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index ba3856674d..37e87fc054 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -816,7 +816,7 @@ class PresenceHandler(object):
         if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
-            yield self.federation.send_edu(
+            yield self.federation.build_and_send_edu(
                 destination=observed_user.domain,
                 edu_type="m.presence_invite",
                 content={
@@ -836,7 +836,7 @@ class PresenceHandler(object):
         if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
-            self.federation.send_edu(
+            self.federation.build_and_send_edu(
                 destination=observer_user.domain,
                 edu_type="m.presence_accept",
                 content={
@@ -848,7 +848,7 @@ class PresenceHandler(object):
             state_dict = yield self.get_state(observed_user, as_event=False)
             state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
-            self.federation.send_edu(
+            self.federation.build_and_send_edu(
                 destination=observer_user.domain,
                 edu_type="m.presence",
                 content={
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 1dfbde84fd..a65c98ff5c 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -147,8 +147,14 @@ class BaseProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def set_displayname(self, target_user, requester, new_displayname, by_admin=False):
-        """target_user is the user whose displayname is to be changed;
-        auth_user is the user attempting to make this change."""
+        """Set the displayname of a user
+
+        Args:
+            target_user (UserID): the user whose displayname is to be changed.
+            requester (Requester): The user attempting to make this change.
+            new_displayname (str): The displayname to give this user.
+            by_admin (bool): Whether this change was made by an administrator.
+        """
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 696469732c..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,10 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -39,42 +37,17 @@ class ReceiptsHandler(BaseHandler):
         self.state = hs.get_state_handler()
 
     @defer.inlineCallbacks
-    def received_client_receipt(self, room_id, receipt_type, user_id,
-                                event_id):
-        """Called when a client tells us a local user has read up to the given
-        event_id in the room.
-        """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
-                "ts": int(self.clock.time_msec()),
-            }
-        }
-
-        is_new = yield self._handle_new_receipts([receipt])
-
-        if is_new:
-            # fire off a process in the background to send the receipt to
-            # remote servers
-            run_as_background_process(
-                'push_receipts_to_remotes', self._push_remotes, receipt
-            )
-
-    @defer.inlineCallbacks
     def _received_remote_receipt(self, origin, content):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -90,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -115,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -128,43 +99,26 @@ class ReceiptsHandler(BaseHandler):
         defer.returnValue(True)
 
     @defer.inlineCallbacks
-    def _push_remotes(self, receipt):
-        """Given a receipt, works out which remote servers should be
-        poked and pokes them.
+    def received_client_receipt(self, room_id, receipt_type, user_id,
+                                event_id):
+        """Called when a client tells us a local user has read up to the given
+        event_id in the room.
         """
-        try:
-            # TODO: optimise this to move some of the work to the workers.
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
-            users = yield self.state.get_current_user_in_room(room_id)
-            remotedomains = set(get_domain_from_id(u) for u in users)
-            remotedomains = remotedomains.copy()
-            remotedomains.discard(self.server_name)
-
-            logger.debug("Sending receipt to: %r", remotedomains)
-
-            for domain in remotedomains:
-                self.federation.send_edu(
-                    destination=domain,
-                    edu_type="m.receipt",
-                    content={
-                        room_id: {
-                            receipt_type: {
-                                user_id: {
-                                    "event_ids": event_ids,
-                                    "data": data,
-                                }
-                            }
-                        },
-                    },
-                    key=(room_id, receipt_type, user_id),
-                )
-        except Exception:
-            logger.exception("Error pushing receipts to remote servers")
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
+                "ts": int(self.clock.time_msec()),
+            },
+        )
+
+        is_new = yield self._handle_new_receipts([receipt])
+        if not is_new:
+            return
+
+        yield self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c0e06929bd..58940e0320 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,7 +23,9 @@ from synapse.api.constants import LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
+    ConsentNotGivenError,
     InvalidCaptchaError,
+    LimitExceededError,
     RegistrationError,
     SynapseError,
 )
@@ -60,6 +62,7 @@ class RegistrationHandler(BaseHandler):
         self.user_directory_handler = hs.get_user_directory_handler()
         self.captcha_client = CaptchaServerHttpClient(hs)
         self.identity_handler = self.hs.get_handlers().identity_handler
+        self.ratelimiter = hs.get_registration_ratelimiter()
 
         self._next_generated_user_id = None
 
@@ -149,6 +152,7 @@ class RegistrationHandler(BaseHandler):
         threepid=None,
         user_type=None,
         default_display_name=None,
+        address=None,
     ):
         """Registers a new client on the server.
 
@@ -167,6 +171,7 @@ class RegistrationHandler(BaseHandler):
               api.constants.UserTypes, or None for a normal user.
             default_display_name (unicode|None): if set, the new user's displayname
               will be set to this. Defaults to 'localpart'.
+            address (str|None): the IP address used to perform the registration.
         Returns:
             A tuple of (user_id, access_token).
         Raises:
@@ -206,7 +211,7 @@ class RegistrationHandler(BaseHandler):
             token = None
             if generate_token:
                 token = self.macaroon_gen.generate_access_token(user_id)
-            yield self._register_with_store(
+            yield self.register_with_store(
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
@@ -215,6 +220,7 @@ class RegistrationHandler(BaseHandler):
                 create_profile_with_displayname=default_display_name,
                 admin=admin,
                 user_type=user_type,
+                address=address,
             )
 
             if self.hs.config.user_directory_search_all_users:
@@ -238,12 +244,13 @@ class RegistrationHandler(BaseHandler):
                 if default_display_name is None:
                     default_display_name = localpart
                 try:
-                    yield self._register_with_store(
+                    yield self.register_with_store(
                         user_id=user_id,
                         token=token,
                         password_hash=password_hash,
                         make_guest=make_guest,
                         create_profile_with_displayname=default_display_name,
+                        address=address,
                     )
                 except SynapseError:
                     # if user id is taken, just generate another
@@ -305,6 +312,10 @@ class RegistrationHandler(BaseHandler):
                         )
                 else:
                     yield self._join_user_to_room(fake_requester, r)
+            except ConsentNotGivenError as e:
+                # Technically not necessary to pull out this error though
+                # moving away from bare excepts is a good thing to do.
+                logger.error("Failed to join new user to %r: %r", r, e)
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
@@ -337,7 +348,7 @@ class RegistrationHandler(BaseHandler):
             user_id, allowed_appservice=service
         )
 
-        yield self._register_with_store(
+        yield self.register_with_store(
             user_id=user_id,
             password_hash="",
             appservice_id=service_id,
@@ -513,7 +524,7 @@ class RegistrationHandler(BaseHandler):
         token = self.macaroon_gen.generate_access_token(user_id)
 
         if need_register:
-            yield self._register_with_store(
+            yield self.register_with_store(
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
@@ -590,10 +601,10 @@ class RegistrationHandler(BaseHandler):
             ratelimit=False,
         )
 
-    def _register_with_store(self, user_id, token=None, password_hash=None,
-                             was_guest=False, make_guest=False, appservice_id=None,
-                             create_profile_with_displayname=None, admin=False,
-                             user_type=None):
+    def register_with_store(self, user_id, token=None, password_hash=None,
+                            was_guest=False, make_guest=False, appservice_id=None,
+                            create_profile_with_displayname=None, admin=False,
+                            user_type=None, address=None):
         """Register user in the datastore.
 
         Args:
@@ -612,10 +623,26 @@ class RegistrationHandler(BaseHandler):
             admin (boolean): is an admin user?
             user_type (str|None): type of user. One of the values from
                 api.constants.UserTypes, or None for a normal user.
+            address (str|None): the IP address used to perform the registration.
 
         Returns:
             Deferred
         """
+        # Don't rate limit for app services
+        if appservice_id is None and address is not None:
+            time_now = self.clock.time()
+
+            allowed, time_allowed = self.ratelimiter.can_do_action(
+                address, time_now_s=time_now,
+                rate_hz=self.hs.config.rc_registration.per_second,
+                burst_count=self.hs.config.rc_registration.burst_count,
+            )
+
+            if not allowed:
+                raise LimitExceededError(
+                    retry_after_ms=int(1000 * (time_allowed - time_now)),
+                )
+
         if self.hs.config.worker_app:
             return self._register_client(
                 user_id=user_id,
@@ -627,6 +654,7 @@ class RegistrationHandler(BaseHandler):
                 create_profile_with_displayname=create_profile_with_displayname,
                 admin=admin,
                 user_type=user_type,
+                address=address,
             )
         else:
             return self.store.register(
@@ -693,9 +721,9 @@ class RegistrationHandler(BaseHandler):
             access_token (str|None): The access token of the newly logged in
                 device, or None if `inhibit_login` enabled.
             bind_email (bool): Whether to bind the email with the identity
-                server
+                server.
             bind_msisdn (bool): Whether to bind the msisdn with the identity
-                server
+                server.
         """
         if self.hs.config.worker_app:
             yield self._post_registration_client(
@@ -737,7 +765,7 @@ class RegistrationHandler(BaseHandler):
         """A user consented to the terms on registration
 
         Args:
-            user_id (str): The user ID that consented
+            user_id (str): The user ID that consented.
             consent_version (str): version of the policy the user has
                 consented to.
         """
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..d6c9d56007 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
+        self.enable_room_list_search = hs.config.enable_room_list_search
         self.response_cache = ResponseCache(hs, "room_list")
         self.remote_response_cache = ResponseCache(hs, "remote_room_list",
                                                    timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        if not self.enable_room_list_search:
+            return defer.succeed({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         logger.info(
             "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
             limit, since_token, bool(search_filter), network_tuple,
         )
+
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
                                     search_filter=None, include_all_networks=False,
                                     third_party_instance_id=None,):
+        if not self.enable_room_list_search:
+            defer.returnValue({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         if search_filter:
             # We currently don't support searching across federation, so we have
             # to do it manually without pagination
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 190ea2c7b1..71ce5b54e5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
         txn_id=None,
         ratelimit=True,
         content=None,
+        require_consent=True,
     ):
         user_id = target.to_string()
 
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
             token_id=requester.access_token_id,
             txn_id=txn_id,
             prev_events_and_hashes=prev_events_and_hashes,
+            require_consent=require_consent,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
                 self.copy_room_tags_and_direct_to_room(
                     predecessor["room_id"], room_id, user_id,
                 )
+                # Move over old push rules
+                self.store.move_push_rules_from_room_to_room_for_user(
+                    predecessor["room_id"], room_id, user_id,
+                )
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -301,6 +307,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         key = (room_id,)
 
@@ -315,6 +322,7 @@ class RoomMemberHandler(object):
                 third_party_signed=third_party_signed,
                 ratelimit=ratelimit,
                 content=content,
+                require_consent=require_consent,
             )
 
         defer.returnValue(result)
@@ -331,6 +339,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         content_specified = bool(content)
         if content is None:
@@ -512,6 +521,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
             prev_events_and_hashes=prev_events_and_hashes,
             content=content,
+            require_consent=require_consent,
         )
         defer.returnValue(res)
 
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
new file mode 100644
index 0000000000..b268bbcb2c
--- /dev/null
+++ b/synapse/handlers/state_deltas.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasHandler(object):
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+        """Given two events check if the `key_name` field in content changed
+        from not matching `public_value` to doing so.
+
+        For example, check if `history_visibility` (`key_name`) changed from
+        `shared` to `world_readable` (`public_value`).
+
+        Returns:
+            None if the field in the events either both match `public_value`
+            or if neither do, i.e. there has been no change.
+            True if it didnt match `public_value` but now does
+            False if it did match `public_value` but now doesn't
+        """
+        prev_event = None
+        event = None
+        if prev_event_id:
+            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+        if event_id:
+            event = yield self.store.get_event(event_id, allow_none=True)
+
+        if not event and not prev_event:
+            logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+            defer.returnValue(None)
+
+        prev_value = None
+        value = None
+
+        if prev_event:
+            prev_value = prev_event.content.get(key_name)
+
+        if event:
+            value = event.content.get(key_name)
+
+        logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+        if value == public_value and prev_value != public_value:
+            defer.returnValue(True)
+        elif value != public_value and prev_value == public_value:
+            defer.returnValue(False)
+        else:
+            defer.returnValue(None)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bd97241ab4..57bb996245 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -39,6 +39,9 @@ from synapse.visibility import filter_events_for_client
 
 logger = logging.getLogger(__name__)
 
+# Debug logger for https://github.com/matrix-org/synapse/issues/4422
+issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+
 
 # Counts the number of times we returned a non-empty sync. `type` is one of
 # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -962,6 +965,15 @@ class SyncHandler(object):
 
         yield self._generate_sync_entry_for_groups(sync_result_builder)
 
+        # debug for https://github.com/matrix-org/synapse/issues/4422
+        for joined_room in sync_result_builder.joined:
+            room_id = joined_room.room_id
+            if room_id in newly_joined_rooms:
+                issue4422_logger.debug(
+                    "Sync result for newly joined room %s: %r",
+                    room_id, joined_room,
+                )
+
         defer.returnValue(SyncResult(
             presence=sync_result_builder.presence,
             account_data=sync_result_builder.account_data,
@@ -1425,6 +1437,17 @@ class SyncHandler(object):
                     old_mem_ev = yield self.store.get_event(
                         old_mem_ev_id, allow_none=True
                     )
+
+                # debug for #4422
+                if has_join:
+                    prev_membership = None
+                    if old_mem_ev:
+                        prev_membership = old_mem_ev.membership
+                    issue4422_logger.debug(
+                        "Previous membership for room %s with join: %s (event %s)",
+                        room_id, prev_membership, old_mem_ev_id,
+                    )
+
                 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
                     newly_joined_rooms.append(room_id)
 
@@ -1519,30 +1542,39 @@ class SyncHandler(object):
         for room_id in sync_result_builder.joined_room_ids:
             room_entry = room_to_events.get(room_id, None)
 
+            newly_joined = room_id in newly_joined_rooms
             if room_entry:
                 events, start_key = room_entry
 
                 prev_batch_token = now_token.copy_and_replace("room_key", start_key)
 
-                room_entries.append(RoomSyncResultBuilder(
+                entry = RoomSyncResultBuilder(
                     room_id=room_id,
                     rtype="joined",
                     events=events,
-                    newly_joined=room_id in newly_joined_rooms,
+                    newly_joined=newly_joined,
                     full_state=False,
-                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    since_token=None if newly_joined else since_token,
                     upto_token=prev_batch_token,
-                ))
+                )
             else:
-                room_entries.append(RoomSyncResultBuilder(
+                entry = RoomSyncResultBuilder(
                     room_id=room_id,
                     rtype="joined",
                     events=[],
-                    newly_joined=room_id in newly_joined_rooms,
+                    newly_joined=newly_joined,
                     full_state=False,
                     since_token=since_token,
                     upto_token=since_token,
-                ))
+                )
+
+            if newly_joined:
+                # debugging for https://github.com/matrix-org/synapse/issues/4422
+                issue4422_logger.debug(
+                    "RoomSyncResultBuilder events for newly joined room %s: %r",
+                    room_id, entry.events,
+                )
+            room_entries.append(entry)
 
         defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
 
@@ -1663,6 +1695,13 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
+        if newly_joined:
+            # debug for https://github.com/matrix-org/synapse/issues/4422
+            issue4422_logger.debug(
+                "Timeline events after filtering in newly-joined room %s: %r",
+                room_id, batch,
+            )
+
         # When we join the room (or the client requests full_state), we should
         # send down any existing tags. Usually the user won't have tags in a
         # newly joined room, unless either a) they've joined before or b) the
@@ -1894,15 +1933,34 @@ def _calculate_state(
 
 
 class SyncResultBuilder(object):
-    "Used to help build up a new SyncResult for a user"
+    """Used to help build up a new SyncResult for a user
+
+    Attributes:
+        sync_config (SyncConfig)
+        full_state (bool)
+        since_token (StreamToken)
+        now_token (StreamToken)
+        joined_room_ids (list[str])
+
+        # The following mirror the fields in a sync response
+        presence (list)
+        account_data (list)
+        joined (list[JoinedSyncResult])
+        invited (list[InvitedSyncResult])
+        archived (list[ArchivedSyncResult])
+        device (list)
+        groups (GroupsSyncResult|None)
+        to_device (list)
+    """
     def __init__(self, sync_config, full_state, since_token, now_token,
                  joined_room_ids):
         """
         Args:
-            sync_config(SyncConfig)
-            full_state(bool): The full_state flag as specified by user
-            since_token(StreamToken): The token supplied by user, or None.
-            now_token(StreamToken): The token to sync up to.
+            sync_config (SyncConfig)
+            full_state (bool): The full_state flag as specified by user
+            since_token (StreamToken): The token supplied by user, or None.
+            now_token (StreamToken): The token to sync up to.
+            joined_room_ids (list[str]): List of rooms the user is joined to
         """
         self.sync_config = sync_config
         self.full_state = full_state
@@ -1930,8 +1988,8 @@ class RoomSyncResultBuilder(object):
         Args:
             room_id(str)
             rtype(str): One of `"joined"` or `"archived"`
-            events(list): List of events to include in the room, (more events
-                may be added when generating result).
+            events(list[FrozenEvent]): List of events to include in the room
+                (more events may be added when generating result).
             newly_joined(bool): If the user has newly joined the room
             full_state(bool): Whether the full state should be sent in result
             since_token(StreamToken): Earliest point to return events from, or None
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a61bbf9392..39df960c31 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -231,7 +231,7 @@ class TypingHandler(object):
             for domain in set(get_domain_from_id(u) for u in users):
                 if domain != self.server_name:
                     logger.debug("sending typing update to %s", domain)
-                    self.federation.send_edu(
+                    self.federation.build_and_send_edu(
                         destination=domain,
                         edu_type="m.typing",
                         content={
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 283c6c1b81..b689979b4b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,12 +15,13 @@
 
 import logging
 
-from six import iteritems
+from six import iteritems, iterkeys
 
 from twisted.internet import defer
 
 import synapse.metrics
 from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import get_localpart_from_id
@@ -29,7 +30,7 @@ from synapse.util.metrics import Measure
 logger = logging.getLogger(__name__)
 
 
-class UserDirectoryHandler(object):
+class UserDirectoryHandler(StateDeltasHandler):
     """Handles querying of and keeping updated the user_directory.
 
     N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -38,19 +39,11 @@ class UserDirectoryHandler(object):
     world_readable or publically joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
-
-    For each user in the directory we also store a room_id which is public and that the
-    user is joined to. This allows us to ignore history_visibility and join_rules changes
-    for that user in all other public rooms, as we know they'll still be in at least
-    one public room.
     """
 
-    INITIAL_ROOM_SLEEP_MS = 50
-    INITIAL_ROOM_SLEEP_COUNT = 100
-    INITIAL_ROOM_BATCH_SIZE = 100
-    INITIAL_USER_SLEEP_MS = 10
-
     def __init__(self, hs):
+        super(UserDirectoryHandler, self).__init__(hs)
+
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -59,15 +52,6 @@ class UserDirectoryHandler(object):
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
         self.search_all_users = hs.config.user_directory_search_all_users
-
-        # When start up for the first time we need to populate the user_directory.
-        # This is a set of user_id's we've inserted already
-        self.initially_handled_users = set()
-        self.initially_handled_users_in_public = set()
-
-        self.initially_handled_users_share = set()
-        self.initially_handled_users_share_private_room = set()
-
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -130,7 +114,7 @@ class UserDirectoryHandler(object):
         # Support users are for diagnostics and should not appear in the user directory.
         if not is_support:
             yield self.store.update_profile_in_user_dir(
-                user_id, profile.display_name, profile.avatar_url, None
+                user_id, profile.display_name, profile.avatar_url
             )
 
     @defer.inlineCallbacks
@@ -140,7 +124,6 @@ class UserDirectoryHandler(object):
         # FIXME(#3714): We should probably do this in the same worker as all
         # the other changes.
         yield self.store.remove_from_user_dir(user_id)
-        yield self.store.remove_from_user_in_public_room(user_id)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
@@ -148,10 +131,9 @@ class UserDirectoryHandler(object):
         if self.pos is None:
             self.pos = yield self.store.get_user_directory_stream_pos()
 
-        # If still None then we need to do the initial fill of directory
+        # If still None then the initial background update hasn't happened yet
         if self.pos is None:
-            yield self._do_initial_spam()
-            self.pos = yield self.store.get_user_directory_stream_pos()
+            defer.returnValue(None)
 
         # Loop round handling deltas until we're up to date
         while True:
@@ -173,149 +155,6 @@ class UserDirectoryHandler(object):
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
-    def _do_initial_spam(self):
-        """Populates the user_directory from the current state of the DB, used
-        when synapse first starts with user_directory support
-        """
-        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
-        # Delete any existing entries just in case there are any
-        yield self.store.delete_all_from_user_dir()
-
-        # We process by going through each existing room at a time.
-        room_ids = yield self.store.get_all_rooms()
-
-        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 0
-
-        for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
-            yield self._handle_initial_room(room_id)
-            num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-        logger.info("Processed all rooms.")
-
-        if self.search_all_users:
-            num_processed_users = 0
-            user_ids = yield self.store.get_all_local_users()
-            logger.info(
-                "Doing initial update of user directory. %d users", len(user_ids)
-            )
-            for user_id in user_ids:
-                # We add profiles for all users even if they don't match the
-                # include pattern, just in case we want to change it in future
-                logger.info(
-                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
-                )
-                yield self._handle_local_user(user_id)
-                num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
-            logger.info("Processed all users")
-
-        self.initially_handled_users = None
-        self.initially_handled_users_in_public = None
-        self.initially_handled_users_share = None
-        self.initially_handled_users_share_private_room = None
-
-        yield self.store.update_user_directory_stream_pos(new_pos)
-
-    @defer.inlineCallbacks
-    def _handle_initial_room(self, room_id):
-        """Called when we initially fill out user_directory one room at a time
-        """
-        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
-        if not is_in_room:
-            return
-
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-            room_id
-        )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        user_ids = set(users_with_profile)
-        unhandled_users = user_ids - self.initially_handled_users
-
-        yield self.store.add_profiles_to_user_dir(
-            room_id,
-            {user_id: users_with_profile[user_id] for user_id in unhandled_users},
-        )
-
-        self.initially_handled_users |= unhandled_users
-
-        if is_public:
-            yield self.store.add_users_to_public_room(
-                room_id, user_ids=user_ids - self.initially_handled_users_in_public
-            )
-            self.initially_handled_users_in_public |= user_ids
-
-        # We now go and figure out the new users who share rooms with user entries
-        # We sleep aggressively here as otherwise it can starve resources.
-        # We also batch up inserts/updates, but try to avoid too many at once.
-        to_insert = set()
-        to_update = set()
-        count = 0
-        for user_id in user_ids:
-            if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-            if not self.is_mine_id(user_id):
-                count += 1
-                continue
-
-            if self.store.get_if_app_services_interested_in_user(user_id):
-                count += 1
-                continue
-
-            for other_user_id in user_ids:
-                if user_id == other_user_id:
-                    continue
-
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-                count += 1
-
-                user_set = (user_id, other_user_id)
-
-                if user_set in self.initially_handled_users_share_private_room:
-                    continue
-
-                if user_set in self.initially_handled_users_share:
-                    if is_public:
-                        continue
-                    to_update.add(user_set)
-                else:
-                    to_insert.add(user_set)
-
-                if is_public:
-                    self.initially_handled_users_share.add(user_set)
-                else:
-                    self.initially_handled_users_share_private_room.add(user_set)
-
-                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.add_users_who_share_room(
-                        room_id, not is_public, to_insert
-                    )
-                    to_insert.clear()
-
-                if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.update_users_who_share_room(
-                        room_id, not is_public, to_update
-                    )
-                    to_update.clear()
-
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-            to_insert.clear()
-
-        if to_update:
-            yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update
-            )
-            to_update.clear()
-
-    @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
         """
@@ -356,6 +195,7 @@ class UserDirectoryHandler(object):
                         user_ids = yield self.store.get_users_in_dir_due_to_room(
                             room_id
                         )
+
                         for user_id in user_ids:
                             yield self._handle_remove_user(room_id, user_id)
                         return
@@ -436,14 +276,20 @@ class UserDirectoryHandler(object):
             # ignore the change
             return
 
-        if change:
-            users_with_profile = yield self.state.get_current_user_in_room(room_id)
-            for user_id, profile in iteritems(users_with_profile):
-                yield self._handle_new_user(room_id, user_id, profile)
-        else:
-            users = yield self.store.get_users_in_public_due_to_room(room_id)
-            for user_id in users:
-                yield self._handle_remove_user(room_id, user_id)
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
+
+        # Remove every user from the sharing tables for that room.
+        for user_id in iterkeys(users_with_profile):
+            yield self.store.remove_user_who_share_room(user_id, room_id)
+
+        # Then, re-add them to the tables.
+        # NOTE: this is not the most efficient method, as handle_new_user sets
+        # up local_user -> other_user and other_user_whos_local -> local_user,
+        # which when ran over an entire room, will result in the same values
+        # being added multiple times. The batching upserts shouldn't make this
+        # too bad, though.
+        for user_id, profile in iteritems(users_with_profile):
+            yield self._handle_new_user(room_id, user_id, profile)
 
     @defer.inlineCallbacks
     def _handle_local_user(self, user_id):
@@ -457,7 +303,9 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -469,177 +317,68 @@ class UserDirectoryHandler(object):
         """
         logger.debug("Adding new user to dir, %r", user_id)
 
-        row = yield self.store.get_user_in_directory(user_id)
-        if not row:
-            yield self.store.add_profiles_to_user_dir(room_id, {user_id: profile})
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url
+        )
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
         )
+        # Now we update users who share rooms with users.
+        users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
         if is_public:
-            row = yield self.store.get_user_in_public_room(user_id)
-            if not row:
-                yield self.store.add_users_to_public_room(room_id, [user_id])
+            yield self.store.add_users_in_public_rooms(room_id, (user_id,))
         else:
-            logger.debug("Not adding new user to public dir, %r", user_id)
+            to_insert = set()
 
-        # Now we update users who share rooms with users. We do this by getting
-        # all the current users in the room and seeing which aren't already
-        # marked in the database as sharing with `user_id`
+            # First, if they're our user then we need to update for every user
+            if self.is_mine_id(user_id):
 
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-
-        to_insert = set()
-        to_update = set()
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    user_id
+                )
 
-        is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+                # We don't care about appservice users.
+                if not is_appservice:
+                    for other_user_id in users_with_profile:
+                        if user_id == other_user_id:
+                            continue
 
-        # First, if they're our user then we need to update for every user
-        if self.is_mine_id(user_id) and not is_appservice:
-            # Returns a map of other_user_id -> shared_private. We only need
-            # to update mappings if for users that either don't share a room
-            # already (aren't in the map) or, if the room is private, those that
-            # only share a public room.
-            user_ids_shared = yield self.store.get_users_who_share_room_from_dir(
-                user_id
-            )
+                        to_insert.add((user_id, other_user_id))
 
+            # Next we need to update for every local user in the room
             for other_user_id in users_with_profile:
                 if user_id == other_user_id:
                     continue
 
-                shared_is_private = user_ids_shared.get(other_user_id)
-                if shared_is_private is True:
-                    # We've already marked in the database they share a private room
-                    continue
-                elif shared_is_private is False:
-                    # They already share a public room, so only update if this is
-                    # a private room
-                    if not is_public:
-                        to_update.add((user_id, other_user_id))
-                elif shared_is_private is None:
-                    # This is the first time they both share a room
-                    to_insert.add((user_id, other_user_id))
-
-        # Next we need to update for every local user in the room
-        for other_user_id in users_with_profile:
-            if user_id == other_user_id:
-                continue
-
-            is_appservice = self.store.get_if_app_services_interested_in_user(
-                other_user_id
-            )
-            if self.is_mine_id(other_user_id) and not is_appservice:
-                shared_is_private = yield self.store.get_if_users_share_a_room(
-                    other_user_id, user_id
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    other_user_id
                 )
-                if shared_is_private is True:
-                    # We've already marked in the database they share a private room
-                    continue
-                elif shared_is_private is False:
-                    # They already share a public room, so only update if this is
-                    # a private room
-                    if not is_public:
-                        to_update.add((other_user_id, user_id))
-                elif shared_is_private is None:
-                    # This is the first time they both share a room
+                if self.is_mine_id(other_user_id) and not is_appservice:
                     to_insert.add((other_user_id, user_id))
 
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-
-        if to_update:
-            yield self.store.update_users_who_share_room(
-                room_id, not is_public, to_update
-            )
+            if to_insert:
+                yield self.store.add_users_who_share_private_room(room_id, to_insert)
 
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
-        """Called when we might need to remove user to directory
+        """Called when we might need to remove user from directory
 
         Args:
             room_id (str): room_id that user left or stopped being public that
             user_id (str)
         """
-        logger.debug("Maybe removing user %r", user_id)
-
-        row = yield self.store.get_user_in_directory(user_id)
-        update_user_dir = row and row["room_id"] == room_id
-
-        row = yield self.store.get_user_in_public_room(user_id)
-        update_user_in_public = row and row["room_id"] == room_id
-
-        if update_user_in_public or update_user_dir:
-            # XXX: Make this faster?
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            for j_room_id in rooms:
-                if not update_user_in_public and not update_user_dir:
-                    break
-
-                is_in_room = yield self.store.is_host_joined(
-                    j_room_id, self.server_name
-                )
-
-                if not is_in_room:
-                    continue
+        logger.debug("Removing user %r", user_id)
 
-                if update_user_dir:
-                    update_user_dir = False
-                    yield self.store.update_user_in_user_dir(user_id, j_room_id)
+        # Remove user from sharing tables
+        yield self.store.remove_user_who_share_room(user_id, room_id)
 
-                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-                    j_room_id
-                )
+        # Are they still in any rooms? If not, remove them entirely.
+        rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
 
-                if update_user_in_public and is_public:
-                    yield self.store.update_user_in_public_user_list(user_id, j_room_id)
-                    update_user_in_public = False
-
-        if update_user_dir:
+        if len(rooms_user_is_in) == 0:
             yield self.store.remove_from_user_dir(user_id)
-        elif update_user_in_public:
-            yield self.store.remove_from_user_in_public_room(user_id)
-
-        # Now handle users_who_share_rooms.
-
-        # Get a list of user tuples that were in the DB due to this room and
-        # users (this includes tuples where the other user matches `user_id`)
-        user_tuples = yield self.store.get_users_in_share_dir_with_room_id(
-            user_id, room_id
-        )
-
-        for user_id, other_user_id in user_tuples:
-            # For each user tuple get a list of rooms that they still share,
-            # trying to find a private room, and update the entry in the DB
-            rooms = yield self.store.get_rooms_in_common_for_users(
-                user_id, other_user_id
-            )
-
-            # If they dont share a room anymore, remove the mapping
-            if not rooms:
-                yield self.store.remove_user_who_share_room(user_id, other_user_id)
-                continue
-
-            found_public_share = None
-            for j_room_id in rooms:
-                is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-                    j_room_id
-                )
-
-                if is_public:
-                    found_public_share = j_room_id
-                else:
-                    found_public_share = None
-                    yield self.store.update_users_who_share_room(
-                        room_id, not is_public, [(user_id, other_user_id)]
-                    )
-                    break
-
-            if found_public_share:
-                yield self.store.update_users_who_share_room(
-                    room_id, not is_public, [(user_id, other_user_id)]
-                )
 
     @defer.inlineCallbacks
     def _handle_profile_change(self, user_id, room_id, prev_event_id, event_id):
@@ -665,50 +404,4 @@ class UserDirectoryHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id
-            )
-
-    @defer.inlineCallbacks
-    def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
-        """Given two events check if the `key_name` field in content changed
-        from not matching `public_value` to doing so.
-
-        For example, check if `history_visibility` (`key_name`) changed from
-        `shared` to `world_readable` (`public_value`).
-
-        Returns:
-            None if the field in the events either both match `public_value`
-            or if neither do, i.e. there has been no change.
-            True if it didnt match `public_value` but now does
-            False if it did match `public_value` but now doesn't
-        """
-        prev_event = None
-        event = None
-        if prev_event_id:
-            prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
-
-        if event_id:
-            event = yield self.store.get_event(event_id, allow_none=True)
-
-        if not event and not prev_event:
-            logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
-            defer.returnValue(None)
-
-        prev_value = None
-        value = None
-
-        if prev_event:
-            prev_value = prev_event.content.get(key_name)
-
-        if event:
-            value = event.content.get(key_name)
-
-        logger.debug("prev_value: %r -> value: %r", prev_value, value)
-
-        if value == public_value and prev_value != public_value:
-            defer.returnValue(True)
-        elif value != public_value and prev_value == public_value:
-            defer.returnValue(False)
-        else:
-            defer.returnValue(None)
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)