diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 543bf28aec..feca3e4c10 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
import logging
@@ -159,7 +159,7 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
- results = yield preserve_context_over_deferred(defer.DeferredList([
+ results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
], consumeErrors=True))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 080eb14271..573c9db8a1 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -17,7 +17,10 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import LoginType
-from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
+from synapse.api.errors import (
+ AuthError, Codes, InteractiveAuthIncompleteError, LoginError, StoreError,
+ SynapseError,
+)
from synapse.module_api import ModuleApi
from synapse.types import UserID
from synapse.util.async import run_on_reactor
@@ -46,7 +49,6 @@ class AuthHandler(BaseHandler):
"""
super(AuthHandler, self).__init__(hs)
self.checkers = {
- LoginType.PASSWORD: self._check_password_auth,
LoginType.RECAPTCHA: self._check_recaptcha,
LoginType.EMAIL_IDENTITY: self._check_email_identity,
LoginType.MSISDN: self._check_msisdn,
@@ -75,15 +77,76 @@ class AuthHandler(BaseHandler):
self.macaroon_gen = hs.get_macaroon_generator()
self._password_enabled = hs.config.password_enabled
- login_types = set()
+ # we keep this as a list despite the O(N^2) implication so that we can
+ # keep PASSWORD first and avoid confusing clients which pick the first
+ # type in the list. (NB that the spec doesn't require us to do so and
+ # clients which favour types that they don't understand over those that
+ # they do are technically broken)
+ login_types = []
if self._password_enabled:
- login_types.add(LoginType.PASSWORD)
+ login_types.append(LoginType.PASSWORD)
for provider in self.password_providers:
if hasattr(provider, "get_supported_login_types"):
- login_types.update(
- provider.get_supported_login_types().keys()
- )
- self._supported_login_types = frozenset(login_types)
+ for t in provider.get_supported_login_types().keys():
+ if t not in login_types:
+ login_types.append(t)
+ self._supported_login_types = login_types
+
+ @defer.inlineCallbacks
+ def validate_user_via_ui_auth(self, requester, request_body, clientip):
+ """
+ Checks that the user is who they claim to be, via a UI auth.
+
+ This is used for things like device deletion and password reset where
+ the user already has a valid access token, but we want to double-check
+ that it isn't stolen by re-authenticating them.
+
+ Args:
+ requester (Requester): The user, as given by the access token
+
+ request_body (dict): The body of the request sent by the client
+
+ clientip (str): The IP address of the client.
+
+ Returns:
+ defer.Deferred[dict]: the parameters for this request (which may
+ have been given only in a previous call).
+
+ Raises:
+ InteractiveAuthIncompleteError if the client has not yet completed
+ any of the permitted login flows
+
+ AuthError if the client has completed a login flow, and it gives
+ a different user to `requester`
+ """
+
+ # build a list of supported flows
+ flows = [
+ [login_type] for login_type in self._supported_login_types
+ ]
+
+ result, params, _ = yield self.check_auth(
+ flows, request_body, clientip,
+ )
+
+ # find the completed login type
+ for login_type in self._supported_login_types:
+ if login_type not in result:
+ continue
+
+ user_id = result[login_type]
+ break
+ else:
+ # this can't happen
+ raise Exception(
+ "check_auth returned True but no successful login type",
+ )
+
+ # check that the UI auth matched the access token
+ if user_id != requester.user.to_string():
+ raise AuthError(403, "Invalid auth")
+
+ defer.returnValue(params)
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
@@ -95,26 +158,36 @@ class AuthHandler(BaseHandler):
session with a map, which maps each auth-type (str) to the relevant
identity authenticated by that auth-type (mostly str, but for captcha, bool).
+ If no auth flows have been completed successfully, raises an
+ InteractiveAuthIncompleteError. To handle this, you can use
+ synapse.rest.client.v2_alpha._base.interactive_auth_handler as a
+ decorator.
+
Args:
flows (list): A list of login flows. Each flow is an ordered list of
strings representing auth-types. At least one full
flow must be completed in order for auth to be successful.
+
clientdict: The dictionary from the client root level, not the
'auth' key: this method prompts for auth if none is sent.
+
clientip (str): The IP address of the client.
+
Returns:
- A tuple of (authed, dict, dict, session_id) where authed is true if
- the client has successfully completed an auth flow. If it is true
- the first dict contains the authenticated credentials of each stage.
+ defer.Deferred[dict, dict, str]: a deferred tuple of
+ (creds, params, session_id).
- If authed is false, the first dictionary is the server response to
- the login request and should be passed back to the client.
+ 'creds' contains the authenticated credentials of each stage.
- In either case, the second dict contains the parameters for this
- request (which may have been given only in a previous call).
+ 'params' contains the parameters for this request (which may
+ have been given only in a previous call).
- session_id is the ID of this session, either passed in by the client
- or assigned by the call to check_auth
+ 'session_id' is the ID of this session, either passed in by the
+ client or assigned by this call
+
+ Raises:
+ InteractiveAuthIncompleteError if the client has not yet completed
+ all the stages in any of the permitted flows.
"""
authdict = None
@@ -142,11 +215,8 @@ class AuthHandler(BaseHandler):
clientdict = session['clientdict']
if not authdict:
- defer.returnValue(
- (
- False, self._auth_dict_for_flows(flows, session),
- clientdict, session['id']
- )
+ raise InteractiveAuthIncompleteError(
+ self._auth_dict_for_flows(flows, session),
)
if 'creds' not in session:
@@ -157,14 +227,12 @@ class AuthHandler(BaseHandler):
errordict = {}
if 'type' in authdict:
login_type = authdict['type']
- if login_type not in self.checkers:
- raise LoginError(400, "", Codes.UNRECOGNIZED)
try:
- result = yield self.checkers[login_type](authdict, clientip)
+ result = yield self._check_auth_dict(authdict, clientip)
if result:
creds[login_type] = result
self._save_session(session)
- except LoginError, e:
+ except LoginError as e:
if login_type == LoginType.EMAIL_IDENTITY:
# riot used to have a bug where it would request a new
# validation token (thus sending a new email) each time it
@@ -173,7 +241,7 @@ class AuthHandler(BaseHandler):
#
# Grandfather in the old behaviour for now to avoid
# breaking old riot deployments.
- raise e
+ raise
# this step failed. Merge the error dict into the response
# so that the client can have another go.
@@ -190,12 +258,14 @@ class AuthHandler(BaseHandler):
"Auth completed with creds: %r. Client dict has keys: %r",
creds, clientdict.keys()
)
- defer.returnValue((True, creds, clientdict, session['id']))
+ defer.returnValue((creds, clientdict, session['id']))
ret = self._auth_dict_for_flows(flows, session)
ret['completed'] = creds.keys()
ret.update(errordict)
- defer.returnValue((False, ret, clientdict, session['id']))
+ raise InteractiveAuthIncompleteError(
+ ret,
+ )
@defer.inlineCallbacks
def add_oob_auth(self, stagetype, authdict, clientip):
@@ -268,17 +338,35 @@ class AuthHandler(BaseHandler):
return sess.setdefault('serverdict', {}).get(key, default)
@defer.inlineCallbacks
- def _check_password_auth(self, authdict, _):
- if "user" not in authdict or "password" not in authdict:
- raise LoginError(400, "", Codes.MISSING_PARAM)
+ def _check_auth_dict(self, authdict, clientip):
+ """Attempt to validate the auth dict provided by a client
- user_id = authdict["user"]
- password = authdict["password"]
+ Args:
+ authdict (object): auth dict provided by the client
+ clientip (str): IP address of the client
- (canonical_id, callback) = yield self.validate_login(user_id, {
- "type": LoginType.PASSWORD,
- "password": password,
- })
+ Returns:
+ Deferred: result of the stage verification.
+
+ Raises:
+ StoreError if there was a problem accessing the database
+ SynapseError if there was a problem with the request
+ LoginError if there was an authentication problem.
+ """
+ login_type = authdict['type']
+ checker = self.checkers.get(login_type)
+ if checker is not None:
+ res = yield checker(authdict, clientip)
+ defer.returnValue(res)
+
+ # build a v1-login-style dict out of the authdict and fall back to the
+ # v1 code
+ user_id = authdict.get("user")
+
+ if user_id is None:
+ raise SynapseError(400, "", Codes.MISSING_PARAM)
+
+ (canonical_id, callback) = yield self.validate_login(user_id, authdict)
defer.returnValue(canonical_id)
@defer.inlineCallbacks
@@ -650,41 +738,6 @@ class AuthHandler(BaseHandler):
raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
@defer.inlineCallbacks
- def set_password(self, user_id, newpassword, requester=None):
- password_hash = self.hash(newpassword)
-
- except_access_token_id = requester.access_token_id if requester else None
-
- try:
- yield self.store.user_set_password_hash(user_id, password_hash)
- except StoreError as e:
- if e.code == 404:
- raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
- raise e
- yield self.delete_access_tokens_for_user(
- user_id, except_token_id=except_access_token_id,
- )
- yield self.hs.get_pusherpool().remove_pushers_by_user(
- user_id, except_access_token_id
- )
-
- @defer.inlineCallbacks
- def deactivate_account(self, user_id):
- """Deactivate a user's account
-
- Args:
- user_id (str): ID of user to be deactivated
-
- Returns:
- Deferred
- """
- # FIXME: Theoretically there is a race here wherein user resets
- # password using threepid.
- yield self.delete_access_tokens_for_user(user_id)
- yield self.store.user_delete_threepids(user_id)
- yield self.store.user_set_password_hash(user_id, None)
-
- @defer.inlineCallbacks
def delete_access_token(self, access_token):
"""Invalidate a single access token
@@ -706,6 +759,12 @@ class AuthHandler(BaseHandler):
access_token=access_token,
)
+ # delete pushers associated with this access token
+ if user_info["token_id"] is not None:
+ yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+ str(user_info["user"]), (user_info["token_id"], )
+ )
+
@defer.inlineCallbacks
def delete_access_tokens_for_user(self, user_id, except_token_id=None,
device_id=None):
@@ -728,13 +787,18 @@ class AuthHandler(BaseHandler):
# see if any of our auth providers want to know about this
for provider in self.password_providers:
if hasattr(provider, "on_logged_out"):
- for token, device_id in tokens_and_devices:
+ for token, token_id, device_id in tokens_and_devices:
yield provider.on_logged_out(
user_id=user_id,
device_id=device_id,
access_token=token,
)
+ # delete pushers associated with the access tokens
+ yield self.hs.get_pusherpool().remove_pushers_by_access_token(
+ user_id, (token_id for _, token_id, _ in tokens_and_devices),
+ )
+
@defer.inlineCallbacks
def add_threepid(self, user_id, medium, address, validated_at):
# 'Canonicalise' email addresses down to lower case.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
new file mode 100644
index 0000000000..b1d3814909
--- /dev/null
+++ b/synapse/handlers/deactivate_account.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class DeactivateAccountHandler(BaseHandler):
+ """Handler which deals with deactivating user accounts."""
+ def __init__(self, hs):
+ super(DeactivateAccountHandler, self).__init__(hs)
+ self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
+
+ @defer.inlineCallbacks
+ def deactivate_account(self, user_id):
+ """Deactivate a user's account
+
+ Args:
+ user_id (str): ID of user to be deactivated
+
+ Returns:
+ Deferred
+ """
+ # FIXME: Theoretically there is a race here wherein user resets
+ # password using threepid.
+
+ # first delete any devices belonging to the user, which will also
+ # delete corresponding access tokens.
+ yield self._device_handler.delete_all_devices_for_user(user_id)
+ # then delete any remaining access tokens which weren't associated with
+ # a device.
+ yield self._auth_handler.delete_access_tokens_for_user(user_id)
+
+ yield self.store.user_delete_threepids(user_id)
+ yield self.store.user_set_password_hash(user_id, None)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 579d8477ba..2152efc692 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -171,12 +171,30 @@ class DeviceHandler(BaseHandler):
yield self.notify_device_update(user_id, [device_id])
@defer.inlineCallbacks
+ def delete_all_devices_for_user(self, user_id, except_device_id=None):
+ """Delete all of the user's devices
+
+ Args:
+ user_id (str):
+ except_device_id (str|None): optional device id which should not
+ be deleted
+
+ Returns:
+ defer.Deferred:
+ """
+ device_map = yield self.store.get_devices_by_user(user_id)
+ device_ids = device_map.keys()
+ if except_device_id is not None:
+ device_ids = [d for d in device_ids if d != except_device_id]
+ yield self.delete_devices(user_id, device_ids)
+
+ @defer.inlineCallbacks
def delete_devices(self, user_id, device_ids):
""" Delete several devices
Args:
user_id (str):
- device_ids (str): The list of device IDs to delete
+ device_ids (List[str]): The list of device IDs to delete
Returns:
defer.Deferred:
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index da00aeb0f4..7e5d3f148d 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -375,6 +375,12 @@ class GroupsLocalHandler(object):
def get_publicised_groups_for_user(self, user_id):
if self.hs.is_mine_id(user_id):
result = yield self.store.get_publicised_groups_for_user(user_id)
+
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ result.extend(app_service.get_groups_for_user(user_id))
+
defer.returnValue({"groups": result})
else:
result = yield self.transport_client.get_publicised_groups_for_user(
@@ -415,4 +421,9 @@ class GroupsLocalHandler(object):
uid
)
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ results[uid].extend(app_service.get_groups_for_user(uid))
+
defer.returnValue({"users": results})
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9718d4abc5..c5267b4b84 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -163,7 +163,7 @@ class InitialSyncHandler(BaseHandler):
lambda states: states[event.event_id]
)
- (messages, token), current_state = yield preserve_context_over_deferred(
+ (messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fa96ea69cd..cb158ba962 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1199,7 +1199,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
)
changed = True
else:
- # We expect to be poked occaisonally by the other side.
+ # We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 5e5b1952dd..9800e24453 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -36,6 +36,8 @@ class ProfileHandler(BaseHandler):
"profile", self.on_profile_query
)
+ self.user_directory_handler = hs.get_user_directory_handler()
+
self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
@defer.inlineCallbacks
@@ -139,6 +141,12 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_displayname
)
+ if self.hs.config.user_directory_search_all_users:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ yield self.user_directory_handler.handle_local_profile_change(
+ target_user.to_string(), profile
+ )
+
yield self._update_join_states(requester, target_user)
@defer.inlineCallbacks
@@ -183,6 +191,12 @@ class ProfileHandler(BaseHandler):
target_user.localpart, new_avatar_url
)
+ if self.hs.config.user_directory_search_all_users:
+ profile = yield self.store.get_profileinfo(target_user.localpart)
+ yield self.user_directory_handler.handle_local_profile_change(
+ target_user.to_string(), profile
+ )
+
yield self._update_join_states(requester, target_user)
@defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f6e7e58563..4bc6ef51fe 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -38,6 +38,7 @@ class RegistrationHandler(BaseHandler):
self.auth = hs.get_auth()
self._auth_handler = hs.get_auth_handler()
self.profile_handler = hs.get_profile_handler()
+ self.user_directory_handler = hs.get_user_directory_handler()
self.captcha_client = CaptchaServerHttpClient(hs)
self._next_generated_user_id = None
@@ -165,6 +166,13 @@ class RegistrationHandler(BaseHandler):
),
admin=admin,
)
+
+ if self.hs.config.user_directory_search_all_users:
+ profile = yield self.store.get_profileinfo(localpart)
+ yield self.user_directory_handler.handle_local_profile_change(
+ user_id, profile
+ )
+
else:
# autogen a sequential user ID
attempts = 0
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 2cf34e51cb..bb40075387 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -154,6 +154,8 @@ class RoomListHandler(BaseHandler):
# We want larger rooms to be first, hence negating num_joined_users
rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+ logger.info("Getting ordering for %i rooms since %s",
+ len(room_ids), stream_token)
yield concurrently_execute(get_order_for_room, room_ids, 10)
sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
@@ -181,34 +183,42 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse()
- # Actually generate the entries. _append_room_entry_to_chunk will append to
- # chunk but will stop if len(chunk) > limit
- chunk = []
- if limit and not search_filter:
+ logger.info("After sorting and filtering, %i rooms remain",
+ len(rooms_to_scan))
+
+ # _append_room_entry_to_chunk will append to chunk but will stop if
+ # len(chunk) > limit
+ #
+ # Normally we will generate enough results on the first iteration here,
+ # but if there is a search filter, _append_room_entry_to_chunk may
+ # filter some results out, in which case we loop again.
+ #
+ # We don't want to scan over the entire range either as that
+ # would potentially waste a lot of work.
+ #
+ # XXX if there is no limit, we may end up DoSing the server with
+ # calls to get_current_state_ids for every single room on the
+ # server. Surely we should cap this somehow?
+ #
+ if limit:
step = limit + 1
- for i in xrange(0, len(rooms_to_scan), step):
- # We iterate here because the vast majority of cases we'll stop
- # at first iteration, but occaisonally _append_room_entry_to_chunk
- # won't append to the chunk and so we need to loop again.
- # We don't want to scan over the entire range either as that
- # would potentially waste a lot of work.
- yield concurrently_execute(
- lambda r: self._append_room_entry_to_chunk(
- r, rooms_to_num_joined[r],
- chunk, limit, search_filter
- ),
- rooms_to_scan[i:i + step], 10
- )
- if len(chunk) >= limit + 1:
- break
else:
+ step = len(rooms_to_scan)
+
+ chunk = []
+ for i in xrange(0, len(rooms_to_scan), step):
+ batch = rooms_to_scan[i:i + step]
+ logger.info("Processing %i rooms for result", len(batch))
yield concurrently_execute(
lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r],
chunk, limit, search_filter
),
- rooms_to_scan, 5
+ batch, 5,
)
+ logger.info("Now %i rooms in result", len(chunk))
+ if len(chunk) >= limit + 1:
+ break
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
new file mode 100644
index 0000000000..44414e1dc1
--- /dev/null
+++ b/synapse/handlers/set_password.py
@@ -0,0 +1,56 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, StoreError, SynapseError
+from ._base import BaseHandler
+
+logger = logging.getLogger(__name__)
+
+
+class SetPasswordHandler(BaseHandler):
+ """Handler which deals with changing user account passwords"""
+ def __init__(self, hs):
+ super(SetPasswordHandler, self).__init__(hs)
+ self._auth_handler = hs.get_auth_handler()
+ self._device_handler = hs.get_device_handler()
+
+ @defer.inlineCallbacks
+ def set_password(self, user_id, newpassword, requester=None):
+ password_hash = self._auth_handler.hash(newpassword)
+
+ except_device_id = requester.device_id if requester else None
+ except_access_token_id = requester.access_token_id if requester else None
+
+ try:
+ yield self.store.user_set_password_hash(user_id, password_hash)
+ except StoreError as e:
+ if e.code == 404:
+ raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+ raise e
+
+ # we want to log out all of the user's other sessions. First delete
+ # all his other devices.
+ yield self._device_handler.delete_all_devices_for_user(
+ user_id, except_device_id=except_device_id,
+ )
+
+ # and now delete any access tokens which weren't associated with
+ # devices (or were associated with this device).
+ yield self._auth_handler.delete_access_tokens_for_user(
+ user_id, except_token_id=except_access_token_id,
+ )
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index b5be5d9623..714f0195c8 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -20,12 +20,13 @@ from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.storage.roommember import ProfileInfo
from synapse.util.metrics import Measure
from synapse.util.async import sleep
+from synapse.types import get_localpart_from_id
logger = logging.getLogger(__name__)
-class UserDirectoyHandler(object):
+class UserDirectoryHandler(object):
"""Handles querying of and keeping updated the user_directory.
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -41,9 +42,10 @@ class UserDirectoyHandler(object):
one public room.
"""
- INITIAL_SLEEP_MS = 50
- INITIAL_SLEEP_COUNT = 100
- INITIAL_BATCH_SIZE = 100
+ INITIAL_ROOM_SLEEP_MS = 50
+ INITIAL_ROOM_SLEEP_COUNT = 100
+ INITIAL_ROOM_BATCH_SIZE = 100
+ INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
self.store = hs.get_datastore()
@@ -53,6 +55,7 @@ class UserDirectoyHandler(object):
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.update_user_directory = hs.config.update_user_directory
+ self.search_all_users = hs.config.user_directory_search_all_users
# When start up for the first time we need to populate the user_directory.
# This is a set of user_id's we've inserted already
@@ -111,6 +114,15 @@ class UserDirectoyHandler(object):
self._is_processing = False
@defer.inlineCallbacks
+ def handle_local_profile_change(self, user_id, profile):
+ """Called to update index of our local user profiles when they change
+ irrespective of any rooms the user may be in.
+ """
+ yield self.store.update_profile_in_user_dir(
+ user_id, profile.display_name, profile.avatar_url, None,
+ )
+
+ @defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
@@ -148,16 +160,30 @@ class UserDirectoyHandler(object):
room_ids = yield self.store.get_all_rooms()
logger.info("Doing initial update of user directory. %d rooms", len(room_ids))
- num_processed_rooms = 1
+ num_processed_rooms = 0
for room_id in room_ids:
- logger.info("Handling room %d/%d", num_processed_rooms, len(room_ids))
+ logger.info("Handling room %d/%d", num_processed_rooms + 1, len(room_ids))
yield self._handle_initial_room(room_id)
num_processed_rooms += 1
- yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+ yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
logger.info("Processed all rooms.")
+ if self.search_all_users:
+ num_processed_users = 0
+ user_ids = yield self.store.get_all_local_users()
+ logger.info("Doing initial update of user directory. %d users", len(user_ids))
+ for user_id in user_ids:
+ # We add profiles for all users even if they don't match the
+ # include pattern, just in case we want to change it in future
+ logger.info("Handling user %d/%d", num_processed_users + 1, len(user_ids))
+ yield self._handle_local_user(user_id)
+ num_processed_users += 1
+ yield sleep(self.INITIAL_USER_SLEEP_MS / 1000.)
+
+ logger.info("Processed all users")
+
self.initially_handled_users = None
self.initially_handled_users_in_public = None
self.initially_handled_users_share = None
@@ -201,8 +227,8 @@ class UserDirectoyHandler(object):
to_update = set()
count = 0
for user_id in user_ids:
- if count % self.INITIAL_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+ if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+ yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
if not self.is_mine_id(user_id):
count += 1
@@ -216,8 +242,8 @@ class UserDirectoyHandler(object):
if user_id == other_user_id:
continue
- if count % self.INITIAL_SLEEP_COUNT == 0:
- yield sleep(self.INITIAL_SLEEP_MS / 1000.)
+ if count % self.INITIAL_ROOM_SLEEP_COUNT == 0:
+ yield sleep(self.INITIAL_ROOM_SLEEP_MS / 1000.)
count += 1
user_set = (user_id, other_user_id)
@@ -237,13 +263,13 @@ class UserDirectoyHandler(object):
else:
self.initially_handled_users_share_private_room.add(user_set)
- if len(to_insert) > self.INITIAL_BATCH_SIZE:
+ if len(to_insert) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.add_users_who_share_room(
room_id, not is_public, to_insert,
)
to_insert.clear()
- if len(to_update) > self.INITIAL_BATCH_SIZE:
+ if len(to_update) > self.INITIAL_ROOM_BATCH_SIZE:
yield self.store.update_users_who_share_room(
room_id, not is_public, to_update,
)
@@ -385,14 +411,28 @@ class UserDirectoyHandler(object):
yield self._handle_remove_user(room_id, user_id)
@defer.inlineCallbacks
+ def _handle_local_user(self, user_id):
+ """Adds a new local roomless user into the user_directory_search table.
+ Used to populate up the user index when we have an
+ user_directory_search_all_users specified.
+ """
+ logger.debug("Adding new local user to dir, %r", user_id)
+
+ profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
+
+ row = yield self.store.get_user_in_directory(user_id)
+ if not row:
+ yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+
+ @defer.inlineCallbacks
def _handle_new_user(self, room_id, user_id, profile):
"""Called when we might need to add user to directory
Args:
- room_id (str): room_id that user joined or started being public that
+ room_id (str): room_id that user joined or started being public
user_id (str)
"""
- logger.debug("Adding user to dir, %r", user_id)
+ logger.debug("Adding new user to dir, %r", user_id)
row = yield self.store.get_user_in_directory(user_id)
if not row:
@@ -407,7 +447,7 @@ class UserDirectoyHandler(object):
if not row:
yield self.store.add_users_to_public_room(room_id, [user_id])
else:
- logger.debug("Not adding user to public dir, %r", user_id)
+ logger.debug("Not adding new user to public dir, %r", user_id)
# Now we update users who share rooms with users. We do this by getting
# all the current users in the room and seeing which aren't already
|