summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py1
-rw-r--r--synapse/handlers/auth.py60
-rw-r--r--synapse/handlers/deactivate_account.py1
-rw-r--r--synapse/handlers/directory.py8
-rw-r--r--synapse/handlers/events.py7
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py20
-rw-r--r--synapse/handlers/receipts.py79
-rw-r--r--synapse/handlers/register.py9
-rw-r--r--synapse/handlers/room_list.py14
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/handlers/user_directory.py196
12 files changed, 187 insertions, 224 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index d8d86d6ff3..ac09d03ba9 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -165,6 +165,7 @@ class BaseHandler(object):
                     member_event.room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception as e:
                 logger.exception("Error kicking guest user: %s" % (e,))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 2abd9af94f..caad9ae2dd 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
 from synapse.util import logcontext
@@ -99,6 +100,11 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
+        self._account_ratelimiter = Ratelimiter()
+        self._failed_attempts_ratelimiter = Ratelimiter()
+
+        self._clock = self.hs.get_clock()
+
     @defer.inlineCallbacks
     def validate_user_via_ui_auth(self, requester, request_body, clientip):
         """
@@ -568,7 +574,12 @@ class AuthHandler(BaseHandler):
         Returns:
             defer.Deferred: (unicode) canonical_user_id, or None if zero or
             multiple matches
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
+        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             defer.returnValue(res[0])
@@ -634,6 +645,8 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
 
         if username.startswith('@'):
@@ -643,6 +656,8 @@ class AuthHandler(BaseHandler):
                 username, self.hs.hostname
             ).to_string()
 
+        self.ratelimit_login_per_account(qualified_user_id)
+
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -715,9 +730,16 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password. We raise a 403 here, but note
-        # that if we're doing user-interactive login, it turns all LoginErrors
-        # into a 401 anyway.
+        # unknown username or invalid password.
+        self._failed_attempts_ratelimiter.ratelimit(
+            qualified_user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=True,
+        )
+
+        # We raise a 403 here, but note that if we're doing user-interactive
+        # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(
             403, "Invalid password",
             errcode=Codes.FORBIDDEN
@@ -735,6 +757,10 @@ class AuthHandler(BaseHandler):
             password (unicode): the provided password
         Returns:
             (unicode) the canonical_user_id, or None if unknown user / bad password
+
+        Raises:
+            LimitExceededError if the ratelimiter's login requests count for this
+                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -763,6 +789,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", True, user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
+        self.ratelimit_login_per_account(user_id)
         yield self.auth.check_auth_blocking(user_id)
         defer.returnValue(user_id)
 
@@ -934,6 +961,33 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
+    def ratelimit_login_per_account(self, user_id):
+        """Checks whether the process must be stopped because of ratelimiting.
+
+        Checks against two ratelimiters: the generic one for login attempts per
+        account and the one specific to failed attempts.
+
+        Args:
+            user_id (unicode): complete @user:id
+
+        Raises:
+            LimitExceededError if one of the ratelimiters' login requests count
+                for this user is too high too proceed.
+        """
+        self._failed_attempts_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
+        self._account_ratelimiter.ratelimit(
+            user_id.lower(), time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_account.per_second,
+            burst_count=self.hs.config.rc_login_account.burst_count,
+            update=True,
+        )
+
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 75fe50c42c..97d3f31d98 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -164,6 +164,7 @@ class DeactivateAccountHandler(BaseHandler):
                     room_id,
                     "leave",
                     ratelimit=False,
+                    require_consent=False,
                 )
             except Exception:
                 logger.exception(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8b113307d2..fe128d9c88 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -44,6 +44,7 @@ class DirectoryHandler(BaseHandler):
         self.appservice_handler = hs.get_application_service_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.config = hs.config
+        self.enable_room_list_search = hs.config.enable_room_list_search
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -411,6 +412,13 @@ class DirectoryHandler(BaseHandler):
         if visibility not in ["public", "private"]:
             raise SynapseError(400, "Invalid visibility setting")
 
+        if visibility == "public" and not self.enable_room_list_search:
+            # The room list has been disabled.
+            raise AuthError(
+                403,
+                "This user is not permitted to publish rooms to the room list"
+            )
+
         room = yield self.store.get_room(room_id)
         if room is None:
             raise SynapseError(400, "Unknown room")
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f772e62c28..d883e98381 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,7 +19,7 @@ import random
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError
+from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import serialize_event
 from synapse.types import UserID
@@ -61,6 +61,11 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
 
+        if room_id:
+            blocked = yield self.store.is_room_blocked(room_id)
+            if blocked:
+                raise SynapseError(403, "This room has been blocked on this server")
+
         # send any outstanding server notices to the user.
         yield self._server_notices_sender.on_user_syncing(auth_user_id)
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 563bb3cea3..7dfae78db0 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,7 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
@@ -262,6 +262,10 @@ class InitialSyncHandler(BaseHandler):
             A JSON serialisable dict with the snapshot of the room.
         """
 
+        blocked = yield self.store.is_room_blocked(room_id)
+        if blocked:
+            raise SynapseError(403, "This room has been blocked on this server")
+
         user_id = requester.user.to_string()
 
         membership, member_event_id = yield self._check_in_room_or_world_readable(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c762b58902..9b41c7b205 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -243,12 +243,19 @@ class EventCreationHandler(object):
 
         self.spam_checker = hs.get_spam_checker()
 
-        if self.config.block_events_without_consent_error is not None:
+        self._block_events_without_consent_error = (
+            self.config.block_events_without_consent_error
+        )
+
+        # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+        # config options, but *only* if we have a configuration for which we are
+        # going to need it.
+        if self._block_events_without_consent_error:
             self._consent_uri_builder = ConsentURIBuilder(self.config)
 
     @defer.inlineCallbacks
     def create_event(self, requester, event_dict, token_id=None, txn_id=None,
-                     prev_events_and_hashes=None):
+                     prev_events_and_hashes=None, require_consent=True):
         """
         Given a dict from a client, create a new event.
 
@@ -269,6 +276,9 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
+
+            require_consent (bool): Whether to check if the requester has
+                consented to privacy policy.
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
@@ -310,7 +320,7 @@ class EventCreationHandler(object):
                     )
 
         is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
-        if not is_exempt:
+        if require_consent and not is_exempt:
             yield self.assert_accepted_privacy_policy(requester)
 
         if token_id is not None:
@@ -378,7 +388,7 @@ class EventCreationHandler(object):
         Raises:
             ConsentNotGivenError: if the user has not given consent yet
         """
-        if self.config.block_events_without_consent_error is None:
+        if self._block_events_without_consent_error is None:
             return
 
         # exempt AS users from needing consent
@@ -405,7 +415,7 @@ class EventCreationHandler(object):
         consent_uri = self._consent_uri_builder.build_user_consent_uri(
             requester.user.localpart,
         )
-        msg = self.config.block_events_without_consent_error % {
+        msg = self._block_events_without_consent_error % {
             'consent_uri': consent_uri,
         }
         raise ConsentNotGivenError(
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 1728089667..274d2946ad 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -16,9 +16,8 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.types import get_domain_from_id
-
-from ._base import BaseHandler
+from synapse.handlers._base import BaseHandler
+from synapse.types import ReadReceipt
 
 logger = logging.getLogger(__name__)
 
@@ -42,13 +41,13 @@ class ReceiptsHandler(BaseHandler):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = [
-            {
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
-                "event_ids": user_values["event_ids"],
-                "data": user_values.get("data", {}),
-            }
+            ReadReceipt(
+                room_id=room_id,
+                receipt_type=receipt_type,
+                user_id=user_id,
+                event_ids=user_values["event_ids"],
+                data=user_values.get("data", {}),
+            )
             for room_id, room_values in content.items()
             for receipt_type, users in room_values.items()
             for user_id, user_values in users.items()
@@ -64,14 +63,12 @@ class ReceiptsHandler(BaseHandler):
         max_batch_id = None
 
         for receipt in receipts:
-            room_id = receipt["room_id"]
-            receipt_type = receipt["receipt_type"]
-            user_id = receipt["user_id"]
-            event_ids = receipt["event_ids"]
-            data = receipt["data"]
-
             res = yield self.store.insert_receipt(
-                room_id, receipt_type, user_id, event_ids, data
+                receipt.room_id,
+                receipt.receipt_type,
+                receipt.user_id,
+                receipt.event_ids,
+                receipt.data,
             )
 
             if not res:
@@ -89,7 +86,7 @@ class ReceiptsHandler(BaseHandler):
             # no new receipts
             defer.returnValue(False)
 
-        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+        affected_room_ids = list(set([r.room_id for r in receipts]))
 
         self.notifier.on_new_event(
             "receipt_key", max_batch_id, rooms=affected_room_ids
@@ -107,49 +104,21 @@ class ReceiptsHandler(BaseHandler):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
-        receipt = {
-            "room_id": room_id,
-            "receipt_type": receipt_type,
-            "user_id": user_id,
-            "event_ids": [event_id],
-            "data": {
+        receipt = ReadReceipt(
+            room_id=room_id,
+            receipt_type=receipt_type,
+            user_id=user_id,
+            event_ids=[event_id],
+            data={
                 "ts": int(self.clock.time_msec()),
-            }
-        }
+            },
+        )
 
         is_new = yield self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        # Work out which remote servers should be poked and poke them.
-
-        # TODO: optimise this to move some of the work to the workers.
-        data = receipt["data"]
-
-        # XXX why does this not use state.get_current_hosts_in_room() ?
-        users = yield self.state.get_current_user_in_room(room_id)
-        remotedomains = set(get_domain_from_id(u) for u in users)
-        remotedomains = remotedomains.copy()
-        remotedomains.discard(self.server_name)
-
-        logger.debug("Sending receipt to: %r", remotedomains)
-
-        for domain in remotedomains:
-            self.federation.build_and_send_edu(
-                destination=domain,
-                edu_type="m.receipt",
-                content={
-                    room_id: {
-                        receipt_type: {
-                            user_id: {
-                                "event_ids": [event_id],
-                                "data": data,
-                            }
-                        }
-                    },
-                },
-                key=(room_id, receipt_type, user_id),
-            )
+        yield self.federation.send_read_receipt(receipt)
 
     @defer.inlineCallbacks
     def get_receipts_for_room(self, room_id, to_key):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 03130edc54..68f73d3793 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -23,6 +23,7 @@ from synapse.api.constants import LoginType
 from synapse.api.errors import (
     AuthError,
     Codes,
+    ConsentNotGivenError,
     InvalidCaptchaError,
     LimitExceededError,
     RegistrationError,
@@ -311,6 +312,10 @@ class RegistrationHandler(BaseHandler):
                         )
                 else:
                     yield self._join_user_to_room(fake_requester, r)
+            except ConsentNotGivenError as e:
+                # Technically not necessary to pull out this error though
+                # moving away from bare excepts is a good thing to do.
+                logger.error("Failed to join new user to %r: %r", r, e)
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
@@ -629,8 +634,8 @@ class RegistrationHandler(BaseHandler):
 
             allowed, time_allowed = self.ratelimiter.can_do_action(
                 address, time_now_s=time_now,
-                rate_hz=self.hs.config.rc_registration_requests_per_second,
-                burst_count=self.hs.config.rc_registration_request_burst_count,
+                rate_hz=self.hs.config.rc_registration.per_second,
+                burst_count=self.hs.config.rc_registration.burst_count,
             )
 
             if not allowed:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index afa508d729..d6c9d56007 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,6 +44,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
+        self.enable_room_list_search = hs.config.enable_room_list_search
         self.response_cache = ResponseCache(hs, "room_list")
         self.remote_response_cache = ResponseCache(hs, "remote_room_list",
                                                    timeout_ms=30 * 1000)
@@ -66,10 +67,17 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        if not self.enable_room_list_search:
+            return defer.succeed({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         logger.info(
             "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
             limit, since_token, bool(search_filter), network_tuple,
         )
+
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -441,6 +449,12 @@ class RoomListHandler(BaseHandler):
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
                                     search_filter=None, include_all_networks=False,
                                     third_party_instance_id=None,):
+        if not self.enable_room_list_search:
+            defer.returnValue({
+                "chunk": [],
+                "total_room_count_estimate": 0,
+            })
+
         if search_filter:
             # We currently don't support searching across federation, so we have
             # to do it manually without pagination
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 190ea2c7b1..71ce5b54e5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -160,6 +160,7 @@ class RoomMemberHandler(object):
         txn_id=None,
         ratelimit=True,
         content=None,
+        require_consent=True,
     ):
         user_id = target.to_string()
 
@@ -185,6 +186,7 @@ class RoomMemberHandler(object):
             token_id=requester.access_token_id,
             txn_id=txn_id,
             prev_events_and_hashes=prev_events_and_hashes,
+            require_consent=require_consent,
         )
 
         # Check if this event matches the previous membership event for the user.
@@ -232,6 +234,10 @@ class RoomMemberHandler(object):
                 self.copy_room_tags_and_direct_to_room(
                     predecessor["room_id"], room_id, user_id,
                 )
+                # Move over old push rules
+                self.store.move_push_rules_from_room_to_room_for_user(
+                    predecessor["room_id"], room_id, user_id,
+                )
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -301,6 +307,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         key = (room_id,)
 
@@ -315,6 +322,7 @@ class RoomMemberHandler(object):
                 third_party_signed=third_party_signed,
                 ratelimit=ratelimit,
                 content=content,
+                require_consent=require_consent,
             )
 
         defer.returnValue(result)
@@ -331,6 +339,7 @@ class RoomMemberHandler(object):
             third_party_signed=None,
             ratelimit=True,
             content=None,
+            require_consent=True,
     ):
         content_specified = bool(content)
         if content is None:
@@ -512,6 +521,7 @@ class RoomMemberHandler(object):
             ratelimit=ratelimit,
             prev_events_and_hashes=prev_events_and_hashes,
             content=content,
+            require_consent=require_consent,
         )
         defer.returnValue(res)
 
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index c21da8343a..7dc0e236e7 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -38,18 +38,8 @@ class UserDirectoryHandler(object):
     world_readable or publically joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
-
-    For each user in the directory we also store a room_id which is public and that the
-    user is joined to. This allows us to ignore history_visibility and join_rules changes
-    for that user in all other public rooms, as we know they'll still be in at least
-    one public room.
     """
 
-    INITIAL_ROOM_SLEEP_MS = 50
-    INITIAL_ROOM_SLEEP_COUNT = 100
-    INITIAL_ROOM_BATCH_SIZE = 100
-    INITIAL_USER_SLEEP_MS = 10
-
     def __init__(self, hs):
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -59,11 +49,6 @@ class UserDirectoryHandler(object):
         self.is_mine_id = hs.is_mine_id
         self.update_user_directory = hs.config.update_user_directory
         self.search_all_users = hs.config.user_directory_search_all_users
-
-        # When start up for the first time we need to populate the user_directory.
-        # This is a set of user_id's we've inserted already
-        self.initially_handled_users = set()
-
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -126,7 +111,7 @@ class UserDirectoryHandler(object):
         # Support users are for diagnostics and should not appear in the user directory.
         if not is_support:
             yield self.store.update_profile_in_user_dir(
-                user_id, profile.display_name, profile.avatar_url, None
+                user_id, profile.display_name, profile.avatar_url
             )
 
     @defer.inlineCallbacks
@@ -143,10 +128,9 @@ class UserDirectoryHandler(object):
         if self.pos is None:
             self.pos = yield self.store.get_user_directory_stream_pos()
 
-        # If still None then we need to do the initial fill of directory
+        # If still None then the initial background update hasn't happened yet
         if self.pos is None:
-            yield self._do_initial_spam()
-            self.pos = yield self.store.get_user_directory_stream_pos()
+            defer.returnValue(None)
 
         # Loop round handling deltas until we're up to date
         while True:
@@ -168,113 +152,6 @@ class UserDirectoryHandler(object):
                 yield self.store.update_user_directory_stream_pos(self.pos)
 
     @defer.inlineCallbacks
-    def _do_initial_spam(self):
-        """Populates the user_directory from the current state of the DB, used
-        when synapse first starts with user_directory support
-        """
-        new_pos = yield self.store.get_max_stream_id_in_current_state_deltas()
-
-        # Delete any existing entries just in case there are any
-        yield self.store.delete_all_from_user_dir()
-
-        # We process by going through each existing room at a time.
-        room_ids = yield self.store.get_all_rooms()
-
-        logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
-        num_processed_rooms = 0
-
-        for room_id in room_ids:
-            logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
-            yield self._handle_initial_room(room_id)
-            num_processed_rooms += 1
-            yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-        logger.info("Processed all rooms.")
-
-        if self.search_all_users:
-            num_processed_users = 0
-            user_ids = yield self.store.get_all_local_users()
-            logger.info(
-                "Doing initial update of user directory. %d users", len(user_ids)
-            )
-            for user_id in user_ids:
-                # We add profiles for all users even if they don't match the
-                # include pattern, just in case we want to change it in future
-                logger.info(
-                    "Handling user %d/%d", num_processed_users + 1, len(user_ids)
-                )
-                yield self._handle_local_user(user_id)
-                num_processed_users += 1
-                yield self.clock.sleep(self.INITIAL_USER_SLEEP_MS / 1000.0)
-
-            logger.info("Processed all users")
-
-        self.initially_handled_users = None
-
-        yield self.store.update_user_directory_stream_pos(new_pos)
-
-    @defer.inlineCallbacks
-    def _handle_initial_room(self, room_id):
-        """
-        Called when we initially fill out user_directory one room at a time
-        """
-        is_in_room = yield self.store.is_host_joined(room_id, self.server_name)
-        if not is_in_room:
-            return
-
-        is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
-            room_id
-        )
-
-        users_with_profile = yield self.state.get_current_user_in_room(room_id)
-        user_ids = set(users_with_profile)
-        unhandled_users = user_ids - self.initially_handled_users
-
-        yield self.store.add_profiles_to_user_dir(
-            {user_id: users_with_profile[user_id] for user_id in unhandled_users},
-        )
-
-        self.initially_handled_users |= unhandled_users
-
-        # We now go and figure out the new users who share rooms with user entries
-        # We sleep aggressively here as otherwise it can starve resources.
-        # We also batch up inserts/updates, but try to avoid too many at once.
-        to_insert = set()
-        count = 0
-        for user_id in user_ids:
-            if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-
-            if not self.is_mine_id(user_id):
-                count += 1
-                continue
-
-            if self.store.get_if_app_services_interested_in_user(user_id):
-                count += 1
-                continue
-
-            for other_user_id in user_ids:
-                if user_id == other_user_id:
-                    continue
-
-                if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
-                    yield self.clock.sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.0)
-                count += 1
-
-                user_set = (user_id, other_user_id)
-                to_insert.add(user_set)
-
-                if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
-                    yield self.store.add_users_who_share_room(
-                        room_id, not is_public, to_insert
-                    )
-                    to_insert.clear()
-
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
-            to_insert.clear()
-
-    @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
         """Called with the state deltas to process
         """
@@ -423,7 +300,9 @@ class UserDirectoryHandler(object):
 
         row = yield self.store.get_user_in_directory(user_id)
         if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+            yield self.store.update_profile_in_user_dir(
+                user_id, profile.display_name, profile.avatar_url
+            )
 
     @defer.inlineCallbacks
     def _handle_new_user(self, room_id, user_id, profile):
@@ -435,9 +314,9 @@ class UserDirectoryHandler(object):
         """
         logger.debug("Adding new user to dir, %r", user_id)
 
-        row = yield self.store.get_user_in_directory(user_id)
-        if not row:
-            yield self.store.add_profiles_to_user_dir({user_id: profile})
+        yield self.store.update_profile_in_user_dir(
+            user_id, profile.display_name, profile.avatar_url
+        )
 
         is_public = yield self.store.is_room_world_readable_or_publicly_joinable(
             room_id
@@ -445,34 +324,39 @@ class UserDirectoryHandler(object):
         # Now we update users who share rooms with users.
         users_with_profile = yield self.state.get_current_user_in_room(room_id)
 
-        to_insert = set()
+        if is_public:
+            yield self.store.add_users_in_public_rooms(room_id, (user_id,))
+        else:
+            to_insert = set()
 
-        # First, if they're our user then we need to update for every user
-        if self.is_mine_id(user_id):
+            # First, if they're our user then we need to update for every user
+            if self.is_mine_id(user_id):
 
-            is_appservice = self.store.get_if_app_services_interested_in_user(user_id)
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    user_id
+                )
 
-            # We don't care about appservice users.
-            if not is_appservice:
-                for other_user_id in users_with_profile:
-                    if user_id == other_user_id:
-                        continue
+                # We don't care about appservice users.
+                if not is_appservice:
+                    for other_user_id in users_with_profile:
+                        if user_id == other_user_id:
+                            continue
 
-                    to_insert.add((user_id, other_user_id))
+                        to_insert.add((user_id, other_user_id))
 
-        # Next we need to update for every local user in the room
-        for other_user_id in users_with_profile:
-            if user_id == other_user_id:
-                continue
+            # Next we need to update for every local user in the room
+            for other_user_id in users_with_profile:
+                if user_id == other_user_id:
+                    continue
 
-            is_appservice = self.store.get_if_app_services_interested_in_user(
-                other_user_id
-            )
-            if self.is_mine_id(other_user_id) and not is_appservice:
-                to_insert.add((other_user_id, user_id))
+                is_appservice = self.store.get_if_app_services_interested_in_user(
+                    other_user_id
+                )
+                if self.is_mine_id(other_user_id) and not is_appservice:
+                    to_insert.add((other_user_id, user_id))
 
-        if to_insert:
-            yield self.store.add_users_who_share_room(room_id, not is_public, to_insert)
+            if to_insert:
+                yield self.store.add_users_who_share_private_room(room_id, to_insert)
 
     @defer.inlineCallbacks
     def _handle_remove_user(self, room_id, user_id):
@@ -487,10 +371,10 @@ class UserDirectoryHandler(object):
         # Remove user from sharing tables
         yield self.store.remove_user_who_share_room(user_id, room_id)
 
-        # Are they still in a room with members? If not, remove them entirely.
-        users_in_room_with = yield self.store.get_users_who_share_room_from_dir(user_id)
+        # Are they still in any rooms? If not, remove them entirely.
+        rooms_user_is_in = yield self.store.get_user_dir_rooms_user_is_in(user_id)
 
-        if len(users_in_room_with) == 0:
+        if len(rooms_user_is_in) == 0:
             yield self.store.remove_from_user_dir(user_id)
 
     @defer.inlineCallbacks
@@ -517,9 +401,7 @@ class UserDirectoryHandler(object):
         new_avatar = event.content.get("avatar_url")
 
         if prev_name != new_name or prev_avatar != new_avatar:
-            yield self.store.update_profile_in_user_dir(
-                user_id, new_name, new_avatar, room_id
-            )
+            yield self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
 
     @defer.inlineCallbacks
     def _get_key_change(self, prev_event_id, event_id, key_name, public_value):