diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c29c78bd65..d15c6282fb 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -45,6 +45,7 @@ class BaseHandler(object):
self.state_handler = hs.get_state_handler()
self.distributor = hs.get_distributor()
self.ratelimiter = hs.get_ratelimiter()
+ self.admin_redaction_ratelimiter = hs.get_admin_redaction_ratelimiter()
self.clock = hs.get_clock()
self.hs = hs
@@ -53,7 +54,7 @@ class BaseHandler(object):
self.event_builder_factory = hs.get_event_builder_factory()
@defer.inlineCallbacks
- def ratelimit(self, requester, update=True):
+ def ratelimit(self, requester, update=True, is_admin_redaction=False):
"""Ratelimits requests.
Args:
@@ -62,6 +63,9 @@ class BaseHandler(object):
Set to False when doing multiple checks for one request (e.g.
to check up front if we would reject the request), and set to
True for the last call for a given request.
+ is_admin_redaction (bool): Whether this is a room admin/moderator
+ redacting an event. If so then we may apply different
+ ratelimits depending on config.
Raises:
LimitExceededError if the request should be ratelimited
@@ -90,16 +94,33 @@ class BaseHandler(object):
messages_per_second = override.messages_per_second
burst_count = override.burst_count
else:
- messages_per_second = self.hs.config.rc_message.per_second
- burst_count = self.hs.config.rc_message.burst_count
-
- allowed, time_allowed = self.ratelimiter.can_do_action(
- user_id,
- time_now,
- rate_hz=messages_per_second,
- burst_count=burst_count,
- update=update,
- )
+ # We default to different values if this is an admin redaction and
+ # the config is set
+ if is_admin_redaction and self.hs.config.rc_admin_redaction:
+ messages_per_second = self.hs.config.rc_admin_redaction.per_second
+ burst_count = self.hs.config.rc_admin_redaction.burst_count
+ else:
+ messages_per_second = self.hs.config.rc_message.per_second
+ burst_count = self.hs.config.rc_message.burst_count
+
+ if is_admin_redaction and self.hs.config.rc_admin_redaction:
+ # If we have separate config for admin redactions we use a separate
+ # ratelimiter
+ allowed, time_allowed = self.admin_redaction_ratelimiter.can_do_action(
+ user_id,
+ time_now,
+ rate_hz=messages_per_second,
+ burst_count=burst_count,
+ update=update,
+ )
+ else:
+ allowed, time_allowed = self.ratelimiter.can_do_action(
+ user_id,
+ time_now,
+ rate_hz=messages_per_second,
+ burst_count=burst_count,
+ update=update,
+ )
if not allowed:
raise LimitExceededError(
retry_after_ms=int(1000 * (time_allowed - time_now))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d0c0142740..333eb30625 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -21,10 +21,8 @@ import unicodedata
import attr
import bcrypt
import pymacaroons
-from canonicaljson import json
from twisted.internet import defer
-from twisted.web.client import PartialDownloadError
import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
@@ -38,7 +36,8 @@ from synapse.api.errors import (
UserDeactivatedError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.config.emailconfig import ThreepidBehaviour
+from synapse.handlers.ui_auth import INTERACTIVE_AUTH_CHECKERS
+from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker
from synapse.logging.context import defer_to_thread
from synapse.module_api import ModuleApi
from synapse.types import UserID
@@ -58,13 +57,13 @@ class AuthHandler(BaseHandler):
hs (synapse.server.HomeServer):
"""
super(AuthHandler, self).__init__(hs)
- self.checkers = {
- LoginType.RECAPTCHA: self._check_recaptcha,
- LoginType.EMAIL_IDENTITY: self._check_email_identity,
- LoginType.MSISDN: self._check_msisdn,
- LoginType.DUMMY: self._check_dummy_auth,
- LoginType.TERMS: self._check_terms_auth,
- }
+
+ self.checkers = {} # type: dict[str, UserInteractiveAuthChecker]
+ for auth_checker_class in INTERACTIVE_AUTH_CHECKERS:
+ inst = auth_checker_class(hs)
+ if inst.is_enabled():
+ self.checkers[inst.AUTH_TYPE] = inst
+
self.bcrypt_rounds = hs.config.bcrypt_rounds
# This is not a cache per se, but a store of all current sessions that
@@ -158,6 +157,14 @@ class AuthHandler(BaseHandler):
return params
+ def get_enabled_auth_types(self):
+ """Return the enabled user-interactive authentication types
+
+ Returns the UI-Auth types which are supported by the homeserver's current
+ config.
+ """
+ return self.checkers.keys()
+
@defer.inlineCallbacks
def check_auth(self, flows, clientdict, clientip):
"""
@@ -292,7 +299,7 @@ class AuthHandler(BaseHandler):
sess["creds"] = {}
creds = sess["creds"]
- result = yield self.checkers[stagetype](authdict, clientip)
+ result = yield self.checkers[stagetype].check_auth(authdict, clientip)
if result:
creds[stagetype] = result
self._save_session(sess)
@@ -363,7 +370,7 @@ class AuthHandler(BaseHandler):
login_type = authdict["type"]
checker = self.checkers.get(login_type)
if checker is not None:
- res = yield checker(authdict, clientip=clientip)
+ res = yield checker.check_auth(authdict, clientip=clientip)
return res
# build a v1-login-style dict out of the authdict and fall back to the
@@ -376,116 +383,6 @@ class AuthHandler(BaseHandler):
(canonical_id, callback) = yield self.validate_login(user_id, authdict)
return canonical_id
- @defer.inlineCallbacks
- def _check_recaptcha(self, authdict, clientip, **kwargs):
- try:
- user_response = authdict["response"]
- except KeyError:
- # Client tried to provide captcha but didn't give the parameter:
- # bad request.
- raise LoginError(
- 400, "Captcha response is required", errcode=Codes.CAPTCHA_NEEDED
- )
-
- logger.info(
- "Submitting recaptcha response %s with remoteip %s", user_response, clientip
- )
-
- # TODO: get this from the homeserver rather than creating a new one for
- # each request
- try:
- client = self.hs.get_simple_http_client()
- resp_body = yield client.post_urlencoded_get_json(
- self.hs.config.recaptcha_siteverify_api,
- args={
- "secret": self.hs.config.recaptcha_private_key,
- "response": user_response,
- "remoteip": clientip,
- },
- )
- except PartialDownloadError as pde:
- # Twisted is silly
- data = pde.response
- resp_body = json.loads(data)
-
- if "success" in resp_body:
- # Note that we do NOT check the hostname here: we explicitly
- # intend the CAPTCHA to be presented by whatever client the
- # user is using, we just care that they have completed a CAPTCHA.
- logger.info(
- "%s reCAPTCHA from hostname %s",
- "Successful" if resp_body["success"] else "Failed",
- resp_body.get("hostname"),
- )
- if resp_body["success"]:
- return True
- raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
-
- def _check_email_identity(self, authdict, **kwargs):
- return self._check_threepid("email", authdict, **kwargs)
-
- def _check_msisdn(self, authdict, **kwargs):
- return self._check_threepid("msisdn", authdict)
-
- def _check_dummy_auth(self, authdict, **kwargs):
- return defer.succeed(True)
-
- def _check_terms_auth(self, authdict, **kwargs):
- return defer.succeed(True)
-
- @defer.inlineCallbacks
- def _check_threepid(self, medium, authdict, **kwargs):
- if "threepid_creds" not in authdict:
- raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
-
- threepid_creds = authdict["threepid_creds"]
-
- identity_handler = self.hs.get_handlers().identity_handler
-
- logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
- if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
- threepid = yield identity_handler.threepid_from_creds(threepid_creds)
- elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
- row = yield self.store.get_threepid_validation_session(
- medium,
- threepid_creds["client_secret"],
- sid=threepid_creds["sid"],
- validated=True,
- )
-
- threepid = (
- {
- "medium": row["medium"],
- "address": row["address"],
- "validated_at": row["validated_at"],
- }
- if row
- else None
- )
-
- if row:
- # Valid threepid returned, delete from the db
- yield self.store.delete_threepid_session(threepid_creds["sid"])
- else:
- raise SynapseError(
- 400, "Password resets are not enabled on this homeserver"
- )
-
- if not threepid:
- raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
-
- if threepid["medium"] != medium:
- raise LoginError(
- 401,
- "Expecting threepid of type '%s', got '%s'"
- % (medium, threepid["medium"]),
- errcode=Codes.UNAUTHORIZED,
- )
-
- threepid["threepid_creds"] = authdict["threepid_creds"]
-
- return threepid
-
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 5f804d1f13..63267a0a4c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -73,7 +73,9 @@ class DeactivateAccountHandler(BaseHandler):
# unbinding
identity_server_supports_unbinding = True
- threepids = yield self.store.user_get_threepids(user_id)
+ # Retrieve the 3PIDs this user has bound to an identity server
+ threepids = yield self.store.user_get_bound_threepids(user_id)
+
for threepid in threepids:
try:
result = yield self._identity_handler.try_unbind_threepid(
@@ -118,6 +120,10 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+ # Reject all pending invites for the user, so that the user doesn't show up in the
+ # "invited" section of rooms' members list.
+ yield self._reject_pending_invites_for_user(user_id)
+
# Remove all information on the user from the account_validity table.
if self._account_validity_enabled:
yield self.store.delete_account_validity_for_user(user_id)
@@ -127,6 +133,39 @@ class DeactivateAccountHandler(BaseHandler):
return identity_server_supports_unbinding
+ @defer.inlineCallbacks
+ def _reject_pending_invites_for_user(self, user_id):
+ """Reject pending invites addressed to a given user ID.
+
+ Args:
+ user_id (str): The user ID to reject pending invites for.
+ """
+ user = UserID.from_string(user_id)
+ pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
+
+ for room in pending_invites:
+ try:
+ yield self._room_member_handler.update_membership(
+ create_requester(user),
+ user,
+ room.room_id,
+ "leave",
+ ratelimit=False,
+ require_consent=False,
+ )
+ logger.info(
+ "Rejected invite for deactivated user %r in room %r",
+ user_id,
+ room.room_id,
+ )
+ except Exception:
+ logger.exception(
+ "Failed to reject invite for user %r in room %r:"
+ " ignoring and continuing",
+ user_id,
+ room.room_id,
+ )
+
def _start_user_parting(self):
"""
Start the process that goes through the table of users
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 6bf3ef49a8..5ea54f60be 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -329,16 +329,10 @@ class E2eKeysHandler(object):
results = yield self.store.get_e2e_device_keys(local_query)
- # Build the result structure, un-jsonify the results, and add the
- # "unsigned" section
+ # Build the result structure
for user_id, device_keys in results.items():
for device_id, device_info in device_keys.items():
- r = dict(device_info["keys"])
- r["unsigned"] = {}
- display_name = device_info["device_display_name"]
- if display_name is not None:
- r["unsigned"]["device_display_name"] = display_name
- result_dict[user_id][device_id] = r
+ result_dict[user_id][device_id] = device_info
log_kv(results)
return result_dict
@@ -750,7 +744,7 @@ class E2eKeysHandler(object):
)
try:
- stored_device = devices[device_id]["keys"]
+ stored_device = devices[device_id]
except KeyError:
raise NotFoundError("Unknown device")
if self_signing_key_id in stored_device.get("signatures", {}).get(
@@ -800,14 +794,14 @@ class E2eKeysHandler(object):
_, signing_device_id = signing_key_id.split(":", 1)
if (
signing_device_id not in devices
- or signing_key_id not in devices[signing_device_id]["keys"]["keys"]
+ or signing_key_id not in devices[signing_device_id]["keys"]
):
# signed by an unknown device, or the
# device does not have the key
raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
# get the key and check the signature
- pubkey = devices[signing_device_id]["keys"]["keys"][signing_key_id]
+ pubkey = devices[signing_device_id]["keys"][signing_key_id]
verify_key = decode_verify_key_bytes(signing_key_id, decode_base64(pubkey))
_check_device_signature(
user_id, verify_key, signed_master_key, stored_master_key
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index a9d80f708c..0cea445f0d 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -352,8 +352,8 @@ class E2eRoomKeysHandler(object):
A deferred of an empty dict.
"""
if "version" not in version_info:
- raise SynapseError(400, "Missing version in body", Codes.MISSING_PARAM)
- if version_info["version"] != version:
+ version_info["version"] = version
+ elif version_info["version"] != version:
raise SynapseError(
400, "Version in body does not match", Codes.INVALID_PARAM
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 538b16efd6..57f661f16e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2181,103 +2181,10 @@ class FederationHandler(BaseHandler):
auth_events.update(new_state)
- different_auth = event_auth_events.difference(
- e.event_id for e in auth_events.values()
- )
-
yield self._update_context_for_auth_events(
event, context, auth_events, event_key
)
- if not different_auth:
- # we're done
- return
-
- logger.info(
- "auth_events still refers to events which are not in the calculated auth "
- "chain after state resolution: %s",
- different_auth,
- )
-
- # Only do auth resolution if we have something new to say.
- # We can't prove an auth failure.
- do_resolution = False
-
- for e_id in different_auth:
- if e_id in have_events:
- if have_events[e_id] == RejectedReason.NOT_ANCESTOR:
- do_resolution = True
- break
-
- if not do_resolution:
- logger.info(
- "Skipping auth resolution due to lack of provable rejection reasons"
- )
- return
-
- logger.info("Doing auth resolution")
-
- prev_state_ids = yield context.get_prev_state_ids(self.store)
-
- # 1. Get what we think is the auth chain.
- auth_ids = yield self.auth.compute_auth_events(event, prev_state_ids)
- local_auth_chain = yield self.store.get_auth_chain(auth_ids, include_given=True)
-
- try:
- # 2. Get remote difference.
- try:
- result = yield self.federation_client.query_auth(
- origin, event.room_id, event.event_id, local_auth_chain
- )
- except RequestSendFailed as e:
- # The other side isn't around or doesn't implement the
- # endpoint, so lets just bail out.
- logger.info("Failed to query auth from remote: %s", e)
- return
-
- seen_remotes = yield self.store.have_seen_events(
- [e.event_id for e in result["auth_chain"]]
- )
-
- # 3. Process any remote auth chain events we haven't seen.
- for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes:
- continue
-
- if ev.event_id == event.event_id:
- continue
-
- try:
- auth_ids = ev.auth_event_ids()
- auth = {
- (e.type, e.state_key): e
- for e in result["auth_chain"]
- if e.event_id in auth_ids or event.type == EventTypes.Create
- }
- ev.internal_metadata.outlier = True
-
- logger.debug(
- "do_auth %s different_auth: %s", event.event_id, e.event_id
- )
-
- yield self._handle_new_event(origin, ev, auth_events=auth)
-
- if ev.event_id in event_auth_events:
- auth_events[(ev.type, ev.state_key)] = ev
- except AuthError:
- pass
-
- except Exception:
- # FIXME:
- logger.exception("Failed to query auth chain")
-
- # 4. Look at rejects and their proofs.
- # TODO.
-
- yield self._update_context_for_auth_events(
- event, context, auth_events, event_key
- )
-
@defer.inlineCallbacks
def _update_context_for_auth_events(self, event, context, auth_events, event_key):
"""Update the state_ids in an event context after auth event resolution,
@@ -2444,15 +2351,6 @@ class FederationHandler(BaseHandler):
reason_map[e.event_id] = reason
- if reason == RejectedReason.AUTH_ERROR:
- pass
- elif reason == RejectedReason.REPLACED:
- # TODO: Get proof
- pass
- elif reason == RejectedReason.NOT_ANCESTOR:
- # TODO: Get proof.
- pass
-
logger.debug("construct_auth_difference returning")
return {
@@ -2530,12 +2428,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ def on_exchange_third_party_invite_request(self, room_id, event_dict):
"""Handle an exchange_third_party_invite request from a remote server
The remote server will call this when it wants to turn a 3pid invite
into a normal m.room.member invite.
+ Args:
+ room_id (str): The ID of the room.
+
+ event_dict (dict[str, Any]): Dictionary containing the event body.
+
Returns:
Deferred: resolves (to None)
"""
@@ -2565,7 +2468,7 @@ class FederationHandler(BaseHandler):
)
try:
- self.auth.check_from_context(room_version, event, context)
+ yield self.auth.check_from_context(room_version, event, context)
except AuthError as e:
logger.warn("Denying third party invite %r because %s", event, e)
raise e
@@ -2594,7 +2497,12 @@ class FederationHandler(BaseHandler):
original_invite_id, allow_none=True
)
if original_invite:
- display_name = original_invite.content["display_name"]
+ # If the m.room.third_party_invite event's content is empty, it means the
+ # invite has been revoked. In this case, we don't have to raise an error here
+ # because the auth check will fail on the invite (because it's not able to
+ # fetch public keys from the m.room.third_party_invite event's content, which
+ # is empty).
+ display_name = original_invite.content.get("display_name")
event_dict["content"]["third_party_invite"]["display_name"] = display_name
else:
logger.info(
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 45db1c1c06..ba99ddf76d 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -18,160 +18,150 @@
"""Utilities for interacting with Identity Servers"""
import logging
+import urllib
from canonicaljson import json
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
from twisted.internet import defer
+from twisted.internet.error import TimeoutError
from synapse.api.errors import (
+ AuthError,
CodeMessageException,
Codes,
HttpResponseException,
SynapseError,
)
+from synapse.config.emailconfig import ThreepidBehaviour
+from synapse.http.client import SimpleHttpClient
+from synapse.util.hash import sha256_and_url_safe_base64
from synapse.util.stringutils import random_string
from ._base import BaseHandler
logger = logging.getLogger(__name__)
+id_server_scheme = "https://"
+
class IdentityHandler(BaseHandler):
def __init__(self, hs):
super(IdentityHandler, self).__init__(hs)
- self.http_client = hs.get_simple_http_client()
+ self.http_client = SimpleHttpClient(hs)
+ # We create a blacklisting instance of SimpleHttpClient for contacting identity
+ # servers specified by clients
+ self.blacklisting_http_client = SimpleHttpClient(
+ hs, ip_blacklist=hs.config.federation_ip_range_blacklist
+ )
self.federation_http_client = hs.get_http_client()
self.hs = hs
- def _extract_items_from_creds_dict(self, creds):
+ @defer.inlineCallbacks
+ def threepid_from_creds(self, id_server, creds):
"""
- Retrieve entries from a "credentials" dictionary
+ Retrieve and validate a threepid identifier from a "credentials" dictionary against a
+ given identity server
Args:
- creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+ id_server (str): The identity server to validate 3PIDs against. Must be a
+ complete URL including the protocol (http(s)://)
+
+ creds (dict[str, str]): Dictionary containing the following keys:
* client_secret|clientSecret: A unique secret str provided by the client
- * id_server|idServer: the domain of the identity server to query
- * id_access_token: The access token to authenticate to the identity
- server with.
+ * sid: The ID of the validation session
Returns:
- tuple(str, str, str|None): A tuple containing the client_secret, the id_server,
- and the id_access_token value if available.
+ Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
+ the /getValidated3pid endpoint of the Identity Service API, or None if the
+ threepid was not found
"""
client_secret = creds.get("client_secret") or creds.get("clientSecret")
if not client_secret:
raise SynapseError(
- 400, "No client_secret in creds", errcode=Codes.MISSING_PARAM
+ 400, "Missing param client_secret in creds", errcode=Codes.MISSING_PARAM
)
-
- id_server = creds.get("id_server") or creds.get("idServer")
- if not id_server:
+ session_id = creds.get("sid")
+ if not session_id:
raise SynapseError(
- 400, "No id_server in creds", errcode=Codes.MISSING_PARAM
+ 400, "Missing param session_id in creds", errcode=Codes.MISSING_PARAM
)
- id_access_token = creds.get("id_access_token")
- return client_secret, id_server, id_access_token
+ query_params = {"sid": session_id, "client_secret": client_secret}
- @defer.inlineCallbacks
- def threepid_from_creds(self, creds, use_v2=True):
- """
- Retrieve and validate a threepid identitier from a "credentials" dictionary
-
- Args:
- creds (dict[str, str]): Dictionary of credentials that contain the following keys:
- * client_secret|clientSecret: A unique secret str provided by the client
- * id_server|idServer: the domain of the identity server to query
- * id_access_token: The access token to authenticate to the identity
- server with. Required if use_v2 is true
- use_v2 (bool): Whether to use v2 Identity Service API endpoints
-
- Returns:
- Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
- the /getValidated3pid endpoint of the Identity Service API, or None if the
- threepid was not found
- """
- client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
- creds
- )
-
- # If an id_access_token is not supplied, force usage of v1
- if id_access_token is None:
- use_v2 = False
-
- query_params = {"sid": creds["sid"], "client_secret": client_secret}
-
- # Decide which API endpoint URLs and query parameters to use
- if use_v2:
- url = "https://%s%s" % (
- id_server,
- "/_matrix/identity/v2/3pid/getValidated3pid",
- )
- query_params["id_access_token"] = id_access_token
- else:
- url = "https://%s%s" % (
- id_server,
- "/_matrix/identity/api/v1/3pid/getValidated3pid",
- )
+ url = id_server + "/_matrix/identity/api/v1/3pid/getValidated3pid"
try:
data = yield self.http_client.get_json(url, query_params)
- return data if "medium" in data else None
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
except HttpResponseException as e:
- if e.code != 404 or not use_v2:
- # Generic failure
- logger.info("getValidated3pid failed with Matrix error: %r", e)
- raise e.to_synapse_error()
+ logger.info(
+ "%s returned %i for threepid validation for: %s",
+ id_server,
+ e.code,
+ creds,
+ )
+ return None
+
+ # Old versions of Sydent return a 200 http code even on a failed validation
+ # check. Thus, in addition to the HttpResponseException check above (which
+ # checks for non-200 errors), we need to make sure validation_session isn't
+ # actually an error, identified by the absence of a "medium" key
+ # See https://github.com/matrix-org/sydent/issues/215 for details
+ if "medium" in data:
+ return data
- # This identity server is too old to understand Identity Service API v2
- # Attempt v1 endpoint
- logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", url)
- return (yield self.threepid_from_creds(creds, use_v2=False))
+ logger.info("%s reported non-validated threepid: %s", id_server, creds)
+ return None
@defer.inlineCallbacks
- def bind_threepid(self, creds, mxid, use_v2=True):
+ def bind_threepid(
+ self, client_secret, sid, mxid, id_server, id_access_token=None, use_v2=True
+ ):
"""Bind a 3PID to an identity server
Args:
- creds (dict[str, str]): Dictionary of credentials that contain the following keys:
- * client_secret|clientSecret: A unique secret str provided by the client
- * id_server|idServer: the domain of the identity server to query
- * id_access_token: The access token to authenticate to the identity
- server with. Required if use_v2 is true
+ client_secret (str): A unique secret provided by the client
+
+ sid (str): The ID of the validation session
+
mxid (str): The MXID to bind the 3PID to
- use_v2 (bool): Whether to use v2 Identity Service API endpoints
+
+ id_server (str): The domain of the identity server to query
+
+ id_access_token (str): The access token to authenticate to the identity
+ server with, if necessary. Required if use_v2 is true
+
+ use_v2 (bool): Whether to use v2 Identity Service API endpoints. Defaults to True
Returns:
Deferred[dict]: The response from the identity server
"""
- logger.debug("binding threepid %r to %s", creds, mxid)
-
- client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
- creds
- )
-
- sid = creds.get("sid")
- if not sid:
- raise SynapseError(
- 400, "No sid in three_pid_creds", errcode=Codes.MISSING_PARAM
- )
+ logger.debug("Proxying threepid bind request for %s to %s", mxid, id_server)
# If an id_access_token is not supplied, force usage of v1
if id_access_token is None:
use_v2 = False
# Decide which API endpoint URLs to use
+ headers = {}
bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
if use_v2:
bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
- bind_data["id_access_token"] = id_access_token
+ headers["Authorization"] = create_id_access_token_header(id_access_token)
else:
bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
try:
- data = yield self.http_client.post_json_get_json(bind_url, bind_data)
- logger.debug("bound threepid %r to %s", creds, mxid)
+ # Use the blacklisting http client as this call is only to identity servers
+ # provided by a client
+ data = yield self.blacklisting_http_client.post_json_get_json(
+ bind_url, bind_data, headers=headers
+ )
# Remember where we bound the threepid
yield self.store.add_user_bound_threepid(
@@ -186,12 +176,17 @@ class IdentityHandler(BaseHandler):
if e.code != 404 or not use_v2:
logger.error("3PID bind failed with Matrix error: %r", e)
raise e.to_synapse_error()
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
except CodeMessageException as e:
data = json.loads(e.msg) # XXX WAT?
return data
logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
- return (yield self.bind_threepid(creds, mxid, use_v2=False))
+ res = yield self.bind_threepid(
+ client_secret, sid, mxid, id_server, id_access_token, use_v2=False
+ )
+ return res
@defer.inlineCallbacks
def try_unbind_threepid(self, mxid, threepid):
@@ -267,7 +262,11 @@ class IdentityHandler(BaseHandler):
headers = {b"Authorization": auth_headers}
try:
- yield self.http_client.post_json_get_json(url, content, headers)
+ # Use the blacklisting http client as this call is only to identity servers
+ # provided by a client
+ yield self.blacklisting_http_client.post_json_get_json(
+ url, content, headers
+ )
changed = True
except HttpResponseException as e:
changed = False
@@ -276,7 +275,9 @@ class IdentityHandler(BaseHandler):
logger.warn("Received %d response while unbinding threepid", e.code)
else:
logger.error("Failed to unbind threepid on identity server: %s", e)
- raise SynapseError(502, "Failed to contact identity server")
+ raise SynapseError(500, "Failed to contact identity server")
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
yield self.store.remove_user_bound_threepid(
user_id=mxid,
@@ -335,6 +336,15 @@ class IdentityHandler(BaseHandler):
# Generate a session id
session_id = random_string(16)
+ if next_link:
+ # Manipulate the next_link to add the sid, because the caller won't get
+ # it until we send a response, by which time we've sent the mail.
+ if "?" in next_link:
+ next_link += "&"
+ else:
+ next_link += "?"
+ next_link += "sid=" + urllib.parse.quote(session_id)
+
# Generate a new validation token
token = random_string(32)
@@ -409,6 +419,8 @@ class IdentityHandler(BaseHandler):
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
@defer.inlineCallbacks
def requestMsisdnToken(
@@ -457,7 +469,476 @@ class IdentityHandler(BaseHandler):
id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
params,
)
- return data
except HttpResponseException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e.to_synapse_error()
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+
+ assert self.hs.config.public_baseurl
+
+ # we need to tell the client to send the token back to us, since it doesn't
+ # otherwise know where to send it, so add submit_url response parameter
+ # (see also MSC2078)
+ data["submit_url"] = (
+ self.hs.config.public_baseurl
+ + "_matrix/client/unstable/add_threepid/msisdn/submit_token"
+ )
+ return data
+
+ @defer.inlineCallbacks
+ def validate_threepid_session(self, client_secret, sid):
+ """Validates a threepid session with only the client secret and session ID
+ Tries validating against any configured account_threepid_delegates as well as locally.
+
+ Args:
+ client_secret (str): A secret provided by the client
+
+ sid (str): The ID of the session
+
+ Returns:
+ Dict[str, str|int] if validation was successful, otherwise None
+ """
+ # XXX: We shouldn't need to keep wrapping and unwrapping this value
+ threepid_creds = {"client_secret": client_secret, "sid": sid}
+
+ # We don't actually know which medium this 3PID is. Thus we first assume it's email,
+ # and if validation fails we try msisdn
+ validation_session = None
+
+ # Try to validate as email
+ if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
+ # Ask our delegated email identity server
+ validation_session = yield self.threepid_from_creds(
+ self.hs.config.account_threepid_delegate_email, threepid_creds
+ )
+ elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+ # Get a validated session matching these details
+ validation_session = yield self.store.get_threepid_validation_session(
+ "email", client_secret, sid=sid, validated=True
+ )
+
+ if validation_session:
+ return validation_session
+
+ # Try to validate as msisdn
+ if self.hs.config.account_threepid_delegate_msisdn:
+ # Ask our delegated msisdn identity server
+ validation_session = yield self.threepid_from_creds(
+ self.hs.config.account_threepid_delegate_msisdn, threepid_creds
+ )
+
+ return validation_session
+
+ @defer.inlineCallbacks
+ def proxy_msisdn_submit_token(self, id_server, client_secret, sid, token):
+ """Proxy a POST submitToken request to an identity server for verification purposes
+
+ Args:
+ id_server (str): The identity server URL to contact
+
+ client_secret (str): Secret provided by the client
+
+ sid (str): The ID of the session
+
+ token (str): The verification token
+
+ Raises:
+ SynapseError: If we failed to contact the identity server
+
+ Returns:
+ Deferred[dict]: The response dict from the identity server
+ """
+ body = {"client_secret": client_secret, "sid": sid, "token": token}
+
+ try:
+ return (
+ yield self.http_client.post_json_get_json(
+ id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
+ body,
+ )
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ except HttpResponseException as e:
+ logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
+ raise SynapseError(400, "Error contacting the identity server")
+
+ @defer.inlineCallbacks
+ def lookup_3pid(self, id_server, medium, address, id_access_token=None):
+ """Looks up a 3pid in the passed identity server.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+ id_access_token (str|None): The access token to authenticate to the identity
+ server with
+
+ Returns:
+ str|None: the matrix ID of the 3pid, or None if it is not recognized.
+ """
+ if id_access_token is not None:
+ try:
+ results = yield self._lookup_3pid_v2(
+ id_server, id_access_token, medium, address
+ )
+ return results
+
+ except Exception as e:
+ # Catch HttpResponseExcept for a non-200 response code
+ # Check if this identity server does not know about v2 lookups
+ if isinstance(e, HttpResponseException) and e.code == 404:
+ # This is an old identity server that does not yet support v2 lookups
+ logger.warning(
+ "Attempted v2 lookup on v1 identity server %s. Falling "
+ "back to v1",
+ id_server,
+ )
+ else:
+ logger.warning("Error when looking up hashing details: %s", e)
+ return None
+
+ return (yield self._lookup_3pid_v1(id_server, medium, address))
+
+ @defer.inlineCallbacks
+ def _lookup_3pid_v1(self, id_server, medium, address):
+ """Looks up a 3pid in the passed identity server using v1 lookup.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ str: the matrix ID of the 3pid, or None if it is not recognized.
+ """
+ try:
+ data = yield self.blacklisting_http_client.get_json(
+ "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
+ {"medium": medium, "address": address},
+ )
+
+ if "mxid" in data:
+ if "signatures" not in data:
+ raise AuthError(401, "No signatures on 3pid binding")
+ yield self._verify_any_signature(data, id_server)
+ return data["mxid"]
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ except IOError as e:
+ logger.warning("Error from v1 identity server lookup: %s" % (e,))
+
+ return None
+
+ @defer.inlineCallbacks
+ def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+ """Looks up a 3pid in the passed identity server using v2 lookup.
+
+ Args:
+ id_server (str): The server name (including port, if required)
+ of the identity server to use.
+ id_access_token (str): The access token to authenticate to the identity server with
+ medium (str): The type of the third party identifier (e.g. "email").
+ address (str): The third party identifier (e.g. "foo@example.com").
+
+ Returns:
+ Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+ """
+ # Check what hashing details are supported by this identity server
+ try:
+ hash_details = yield self.blacklisting_http_client.get_json(
+ "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
+ {"access_token": id_access_token},
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+
+ if not isinstance(hash_details, dict):
+ logger.warning(
+ "Got non-dict object when checking hash details of %s%s: %s",
+ id_server_scheme,
+ id_server,
+ hash_details,
+ )
+ raise SynapseError(
+ 400,
+ "Non-dict object from %s%s during v2 hash_details request: %s"
+ % (id_server_scheme, id_server, hash_details),
+ )
+
+ # Extract information from hash_details
+ supported_lookup_algorithms = hash_details.get("algorithms")
+ lookup_pepper = hash_details.get("lookup_pepper")
+ if (
+ not supported_lookup_algorithms
+ or not isinstance(supported_lookup_algorithms, list)
+ or not lookup_pepper
+ or not isinstance(lookup_pepper, str)
+ ):
+ raise SynapseError(
+ 400,
+ "Invalid hash details received from identity server %s%s: %s"
+ % (id_server_scheme, id_server, hash_details),
+ )
+
+ # Check if any of the supported lookup algorithms are present
+ if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
+ # Perform a hashed lookup
+ lookup_algorithm = LookupAlgorithm.SHA256
+
+ # Hash address, medium and the pepper with sha256
+ to_hash = "%s %s %s" % (address, medium, lookup_pepper)
+ lookup_value = sha256_and_url_safe_base64(to_hash)
+
+ elif LookupAlgorithm.NONE in supported_lookup_algorithms:
+ # Perform a non-hashed lookup
+ lookup_algorithm = LookupAlgorithm.NONE
+
+ # Combine together plaintext address and medium
+ lookup_value = "%s %s" % (address, medium)
+
+ else:
+ logger.warning(
+ "None of the provided lookup algorithms of %s are supported: %s",
+ id_server,
+ supported_lookup_algorithms,
+ )
+ raise SynapseError(
+ 400,
+ "Provided identity server does not support any v2 lookup "
+ "algorithms that this homeserver supports.",
+ )
+
+ # Authenticate with identity server given the access token from the client
+ headers = {"Authorization": create_id_access_token_header(id_access_token)}
+
+ try:
+ lookup_results = yield self.blacklisting_http_client.post_json_get_json(
+ "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
+ {
+ "addresses": [lookup_value],
+ "algorithm": lookup_algorithm,
+ "pepper": lookup_pepper,
+ },
+ headers=headers,
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ except Exception as e:
+ logger.warning("Error when performing a v2 3pid lookup: %s", e)
+ raise SynapseError(
+ 500, "Unknown error occurred during identity server lookup"
+ )
+
+ # Check for a mapping from what we looked up to an MXID
+ if "mappings" not in lookup_results or not isinstance(
+ lookup_results["mappings"], dict
+ ):
+ logger.warning("No results from 3pid lookup")
+ return None
+
+ # Return the MXID if it's available, or None otherwise
+ mxid = lookup_results["mappings"].get(lookup_value)
+ return mxid
+
+ @defer.inlineCallbacks
+ def _verify_any_signature(self, data, server_hostname):
+ if server_hostname not in data["signatures"]:
+ raise AuthError(401, "No signature from server %s" % (server_hostname,))
+ for key_name, signature in data["signatures"][server_hostname].items():
+ try:
+ key_data = yield self.blacklisting_http_client.get_json(
+ "%s%s/_matrix/identity/api/v1/pubkey/%s"
+ % (id_server_scheme, server_hostname, key_name)
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ if "public_key" not in key_data:
+ raise AuthError(
+ 401, "No public key named %s from %s" % (key_name, server_hostname)
+ )
+ verify_signed_json(
+ data,
+ server_hostname,
+ decode_verify_key_bytes(
+ key_name, decode_base64(key_data["public_key"])
+ ),
+ )
+ return
+
+ @defer.inlineCallbacks
+ def ask_id_server_for_third_party_invite(
+ self,
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter_user_id,
+ room_alias,
+ room_avatar_url,
+ room_join_rules,
+ room_name,
+ inviter_display_name,
+ inviter_avatar_url,
+ id_access_token=None,
+ ):
+ """
+ Asks an identity server for a third party invite.
+
+ Args:
+ requester (Requester)
+ id_server (str): hostname + optional port for the identity server.
+ medium (str): The literal string "email".
+ address (str): The third party address being invited.
+ room_id (str): The ID of the room to which the user is invited.
+ inviter_user_id (str): The user ID of the inviter.
+ room_alias (str): An alias for the room, for cosmetic notifications.
+ room_avatar_url (str): The URL of the room's avatar, for cosmetic
+ notifications.
+ room_join_rules (str): The join rules of the email (e.g. "public").
+ room_name (str): The m.room.name of the room.
+ inviter_display_name (str): The current display name of the
+ inviter.
+ inviter_avatar_url (str): The URL of the inviter's avatar.
+ id_access_token (str|None): The access token to authenticate to the identity
+ server with
+
+ Returns:
+ A deferred tuple containing:
+ token (str): The token which must be signed to prove authenticity.
+ public_keys ([{"public_key": str, "key_validity_url": str}]):
+ public_key is a base64-encoded ed25519 public key.
+ fallback_public_key: One element from public_keys.
+ display_name (str): A user-friendly name to represent the invited
+ user.
+ """
+ invite_config = {
+ "medium": medium,
+ "address": address,
+ "room_id": room_id,
+ "room_alias": room_alias,
+ "room_avatar_url": room_avatar_url,
+ "room_join_rules": room_join_rules,
+ "room_name": room_name,
+ "sender": inviter_user_id,
+ "sender_display_name": inviter_display_name,
+ "sender_avatar_url": inviter_avatar_url,
+ }
+
+ # Add the identity service access token to the JSON body and use the v2
+ # Identity Service endpoints if id_access_token is present
+ data = None
+ base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
+
+ if id_access_token:
+ key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
+ id_server_scheme,
+ id_server,
+ )
+
+ # Attempt a v2 lookup
+ url = base_url + "/v2/store-invite"
+ try:
+ data = yield self.blacklisting_http_client.post_json_get_json(
+ url,
+ invite_config,
+ {"Authorization": create_id_access_token_header(id_access_token)},
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ except HttpResponseException as e:
+ if e.code != 404:
+ logger.info("Failed to POST %s with JSON: %s", url, e)
+ raise e
+
+ if data is None:
+ key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+ id_server_scheme,
+ id_server,
+ )
+ url = base_url + "/api/v1/store-invite"
+
+ try:
+ data = yield self.blacklisting_http_client.post_json_get_json(
+ url, invite_config
+ )
+ except TimeoutError:
+ raise SynapseError(500, "Timed out contacting identity server")
+ except HttpResponseException as e:
+ logger.warning(
+ "Error trying to call /store-invite on %s%s: %s",
+ id_server_scheme,
+ id_server,
+ e,
+ )
+
+ if data is None:
+ # Some identity servers may only support application/x-www-form-urlencoded
+ # types. This is especially true with old instances of Sydent, see
+ # https://github.com/matrix-org/sydent/pull/170
+ try:
+ data = yield self.blacklisting_http_client.post_urlencoded_get_json(
+ url, invite_config
+ )
+ except HttpResponseException as e:
+ logger.warning(
+ "Error calling /store-invite on %s%s with fallback "
+ "encoding: %s",
+ id_server_scheme,
+ id_server,
+ e,
+ )
+ raise e
+
+ # TODO: Check for success
+ token = data["token"]
+ public_keys = data.get("public_keys", [])
+ if "public_key" in data:
+ fallback_public_key = {
+ "public_key": data["public_key"],
+ "key_validity_url": key_validity_url,
+ }
+ else:
+ fallback_public_key = public_keys[0]
+
+ if not public_keys:
+ public_keys.append(fallback_public_key)
+ display_name = data["display_name"]
+ return token, public_keys, fallback_public_key, display_name
+
+
+def create_id_access_token_header(id_access_token):
+ """Create an Authorization header for passing to SimpleHttpClient as the header value
+ of an HTTP request.
+
+ Args:
+ id_access_token (str): An identity server access token.
+
+ Returns:
+ list[str]: The ascii-encoded bearer token encased in a list.
+ """
+ # Prefix with Bearer
+ bearer_token = "Bearer %s" % id_access_token
+
+ # Encode headers to standard ascii
+ bearer_token.encode("ascii")
+
+ # Return as a list as that's how SimpleHttpClient takes header values
+ return [bearer_token]
+
+
+class LookupAlgorithm:
+ """
+ Supported hashing algorithms when performing a 3PID lookup.
+
+ SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
+ encoding
+ NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
+ """
+
+ SHA256 = "sha256"
+ NONE = "none"
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 111f7c7e2f..0f8cce8ffe 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -222,6 +222,13 @@ class MessageHandler(object):
}
+# The duration (in ms) after which rooms should be removed
+# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
+# to generate a dummy event for them once more)
+#
+_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
+
+
class EventCreationHandler(object):
def __init__(self, hs):
self.hs = hs
@@ -258,6 +265,13 @@ class EventCreationHandler(object):
self.config.block_events_without_consent_error
)
+ # Rooms which should be excluded from dummy insertion. (For instance,
+ # those without local users who can send events into the room).
+ #
+ # map from room id to time-of-last-attempt.
+ #
+ self._rooms_to_exclude_from_dummy_event_insertion = {} # type: dict[str, int]
+
# we need to construct a ConsentURIBuilder here, as it checks that the necessary
# config options, but *only* if we have a configuration for which we are
# going to need it.
@@ -729,7 +743,27 @@ class EventCreationHandler(object):
assert not self.config.worker_app
if ratelimit:
- yield self.base_handler.ratelimit(requester)
+ # We check if this is a room admin redacting an event so that we
+ # can apply different ratelimiting. We do this by simply checking
+ # it's not a self-redaction (to avoid having to look up whether the
+ # user is actually admin or not).
+ is_admin_redaction = False
+ if event.type == EventTypes.Redaction:
+ original_event = yield self.store.get_event(
+ event.redacts,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=False,
+ allow_none=True,
+ )
+
+ is_admin_redaction = (
+ original_event and event.sender != original_event.sender
+ )
+
+ yield self.base_handler.ratelimit(
+ requester, is_admin_redaction=is_admin_redaction
+ )
yield self.base_handler.maybe_kick_guest_users(event, context)
@@ -868,9 +902,11 @@ class EventCreationHandler(object):
"""Background task to send dummy events into rooms that have a large
number of extremities
"""
-
+ self._expire_rooms_to_exclude_from_dummy_event_insertion()
room_ids = yield self.store.get_rooms_with_many_extremities(
- min_count=10, limit=5
+ min_count=10,
+ limit=5,
+ room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
)
for room_id in room_ids:
@@ -884,32 +920,61 @@ class EventCreationHandler(object):
members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids
)
+ dummy_event_sent = False
+ for user_id in members:
+ if not self.hs.is_mine_id(user_id):
+ continue
+ requester = create_requester(user_id)
+ try:
+ event, context = yield self.create_event(
+ requester,
+ {
+ "type": "org.matrix.dummy_event",
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
- user_id = None
- for member in members:
- if self.hs.is_mine_id(member):
- user_id = member
- break
-
- if not user_id:
- # We don't have a joined user.
- # TODO: We should do something here to stop the room from
- # appearing next time.
- continue
+ event.internal_metadata.proactively_send = False
- requester = create_requester(user_id)
+ yield self.send_nonmember_event(
+ requester, event, context, ratelimit=False
+ )
+ dummy_event_sent = True
+ break
+ except ConsentNotGivenError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of consent. Will try another user" % (room_id, user_id)
+ )
+ except AuthError:
+ logger.info(
+ "Failed to send dummy event into room %s for user %s due to "
+ "lack of power. Will try another user" % (room_id, user_id)
+ )
- event, context = yield self.create_event(
- requester,
- {
- "type": "org.matrix.dummy_event",
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- prev_events_and_hashes=prev_events_and_hashes,
+ if not dummy_event_sent:
+ # Did not find a valid user in the room, so remove from future attempts
+ # Exclusion is time limited, so the room will be rechecked in the future
+ # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+ logger.info(
+ "Failed to send dummy event into room %s. Will exclude it from "
+ "future attempts until cache expires" % (room_id,)
+ )
+ now = self.clock.time_msec()
+ self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+
+ def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
+ expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+ to_expire = set()
+ for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items():
+ if time < expire_before:
+ to_expire.add(room_id)
+ for room_id in to_expire:
+ logger.debug(
+ "Expiring room id %s from dummy event insertion exclusion cache",
+ room_id,
)
-
- event.internal_metadata.proactively_send = False
-
- yield self.send_nonmember_event(requester, event, context, ratelimit=False)
+ del self._rooms_to_exclude_from_dummy_event_insertion[room_id]
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 053cf66b28..eda15bc623 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -24,6 +24,7 @@ The methods that define policy are:
import logging
from contextlib import contextmanager
+from typing import Dict, Set
from six import iteritems, itervalues
@@ -179,8 +180,9 @@ class PresenceHandler(object):
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
- self.external_process_to_current_syncs = {}
- self.external_process_last_updated_ms = {}
+ self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
+ self.external_process_last_updated_ms = {} # type: Dict[int, int]
+
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
# Start a LoopingCall in 30s that fires every 5s.
@@ -349,10 +351,13 @@ class PresenceHandler(object):
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
+ # For each expired process drop tracking info and check the users
+ # that were syncing on that process to see if they need to be timed
+ # out.
users_to_check.update(
- self.external_process_last_updated_ms.pop(process_id, ())
+ self.external_process_to_current_syncs.pop(process_id, ())
)
- self.external_process_last_update.pop(process_id)
+ self.external_process_last_updated_ms.pop(process_id)
states = [
self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
@@ -803,17 +808,25 @@ class PresenceHandler(object):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
- deltas = yield self.store.get_current_state_deltas(self._event_pos)
- if not deltas:
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self._event_pos == room_max_stream_ordering:
return
+ logger.debug(
+ "Processing presence stats %s->%s",
+ self._event_pos,
+ room_max_stream_ordering,
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self._event_pos, room_max_stream_ordering
+ )
yield self._handle_state_delta(deltas)
- self._event_pos = deltas[-1]["stream_id"]
+ self._event_pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("presence").set(
- self._event_pos
+ max_pos
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 975da57ffd..53410f120b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -217,10 +217,9 @@ class RegistrationHandler(BaseHandler):
else:
# autogen a sequential user ID
- attempts = 0
user = None
while not user:
- localpart = yield self._generate_user_id(attempts > 0)
+ localpart = yield self._generate_user_id()
user = UserID(localpart, self.hs.hostname)
user_id = user.to_string()
yield self.check_user_id_not_appservice_exclusive(user_id)
@@ -238,7 +237,6 @@ class RegistrationHandler(BaseHandler):
# if user id is taken, just generate another
user = None
user_id = None
- attempts += 1
if not self.hs.config.user_consent_at_registration:
yield self._auto_join_rooms(user_id)
@@ -275,16 +273,12 @@ class RegistrationHandler(BaseHandler):
fake_requester = create_requester(user_id)
# try to create the room if we're the first real user on the server. Note
- # that an auto-generated support user is not a real user and will never be
+ # that an auto-generated support or bot user is not a real user and will never be
# the user to create the room
should_auto_create_rooms = False
- is_support = yield self.store.is_support_user(user_id)
- # There is an edge case where the first user is the support user, then
- # the room is never created, though this seems unlikely and
- # recoverable from given the support user being involved in the first
- # place.
- if self.hs.config.autocreate_auto_join_rooms and not is_support:
- count = yield self.store.count_all_users()
+ is_real_user = yield self.store.is_real_user(user_id)
+ if self.hs.config.autocreate_auto_join_rooms and is_real_user:
+ count = yield self.store.count_real_users()
should_auto_create_rooms = count == 1
for r in self.hs.config.auto_join_rooms:
logger.info("Auto-joining %s to %s", user_id, r)
@@ -383,10 +377,10 @@ class RegistrationHandler(BaseHandler):
)
@defer.inlineCallbacks
- def _generate_user_id(self, reseed=False):
- if reseed or self._next_generated_user_id is None:
+ def _generate_user_id(self):
+ if self._next_generated_user_id is None:
with (yield self._generate_user_id_linearizer.queue(())):
- if reseed or self._next_generated_user_id is None:
+ if self._next_generated_user_id is None:
self._next_generated_user_id = (
yield self.store.find_next_generated_user_id_localpart()
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index a509e11d69..2816bd8f87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -28,6 +28,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.http.endpoint import parse_and_validate_server_name
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -554,7 +555,8 @@ class RoomCreationHandler(BaseHandler):
invite_list = config.get("invite", [])
for i in invite_list:
try:
- UserID.from_string(i)
+ uid = UserID.from_string(i)
+ parse_and_validate_server_name(uid.domain)
except Exception:
raise SynapseError(400, "Invalid user_id: %s" % (i,))
@@ -579,8 +581,8 @@ class RoomCreationHandler(BaseHandler):
room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
+ directory_handler = self.hs.get_handlers().directory_handler
if room_alias:
- directory_handler = self.hs.get_handlers().directory_handler
yield directory_handler.create_association(
requester=requester,
room_id=room_id,
@@ -665,6 +667,7 @@ class RoomCreationHandler(BaseHandler):
for invite_3pid in invite_3pid_list:
id_server = invite_3pid["id_server"]
+ id_access_token = invite_3pid.get("id_access_token") # optional
address = invite_3pid["address"]
medium = invite_3pid["medium"]
yield self.hs.get_room_member_handler().do_3pid_invite(
@@ -675,6 +678,7 @@ class RoomCreationHandler(BaseHandler):
id_server,
requester,
txn_id=None,
+ id_access_token=id_access_token,
)
result = {"room_id": room_id}
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..c615206df1 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,8 +16,7 @@
import logging
from collections import namedtuple
-from six import PY3, iteritems
-from six.moves import range
+from six import iteritems
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@@ -27,7 +26,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, HttpResponseException
from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -37,7 +35,6 @@ logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
-
# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
@@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler):
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
+ from_federation (bool): true iff the request comes from the federation
+ API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -89,16 +88,8 @@ class RoomListHandler(BaseHandler):
# appservice specific lists.
logger.info("Bypassing cache as search request.")
- # XXX: Quick hack to stop room directory queries taking too long.
- # Timeout request after 60s. Probably want a more fundamental
- # solution at some point
- timeout = self.clock.time() + 60
return self._get_public_room_list(
- limit,
- since_token,
- search_filter,
- network_tuple=network_tuple,
- timeout=timeout,
+ limit, since_token, search_filter, network_tuple=network_tuple
)
key = (limit, since_token, network_tuple)
@@ -119,7 +110,6 @@ class RoomListHandler(BaseHandler):
search_filter=None,
network_tuple=EMPTY_THIRD_PARTY_ID,
from_federation=False,
- timeout=None,
):
"""Generate a public room list.
Args:
@@ -132,240 +122,116 @@ class RoomListHandler(BaseHandler):
Setting to None returns all public rooms across all lists.
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
- timeout (int|None): Amount of seconds to wait for a response before
- timing out.
"""
- if since_token and since_token != "END":
- since_token = RoomListNextBatch.from_token(since_token)
- else:
- since_token = None
- rooms_to_order_value = {}
- rooms_to_num_joined = {}
+ # Pagination tokens work by storing the room ID sent in the last batch,
+ # plus the direction (forwards or backwards). Next batch tokens always
+ # go forwards, prev batch tokens always go backwards.
- newly_visible = []
- newly_unpublished = []
if since_token:
- stream_token = since_token.stream_ordering
- current_public_id = yield self.store.get_current_public_room_stream_id()
- public_room_stream_id = since_token.public_room_stream_id
- newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
- public_room_stream_id, current_public_id, network_tuple=network_tuple
- )
- else:
- stream_token = yield self.store.get_room_max_stream_ordering()
- public_room_stream_id = yield self.store.get_current_public_room_stream_id()
-
- room_ids = yield self.store.get_public_room_ids_at_stream_id(
- public_room_stream_id, network_tuple=network_tuple
- )
-
- # We want to return rooms in a particular order: the number of joined
- # users. We then arbitrarily use the room_id as a tie breaker.
-
- @defer.inlineCallbacks
- def get_order_for_room(room_id):
- # Most of the rooms won't have changed between the since token and
- # now (especially if the since token is "now"). So, we can ask what
- # the current users are in a room (that will hit a cache) and then
- # check if the room has changed since the since token. (We have to
- # do it in that order to avoid races).
- # If things have changed then fall back to getting the current state
- # at the since token.
- joined_users = yield self.store.get_users_in_room(room_id)
- if self.store.has_room_changed_since(room_id, stream_token):
- latest_event_ids = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_token
- )
-
- if not latest_event_ids:
- return
+ batch_token = RoomListNextBatch.from_token(since_token)
- joined_users = yield self.state_handler.get_current_users_in_room(
- room_id, latest_event_ids
- )
-
- num_joined_users = len(joined_users)
- rooms_to_num_joined[room_id] = num_joined_users
+ bounds = (batch_token.last_joined_members, batch_token.last_room_id)
+ forwards = batch_token.direction_is_forward
+ else:
+ batch_token = None
+ bounds = None
- if num_joined_users == 0:
- return
+ forwards = True
- # We want larger rooms to be first, hence negating num_joined_users
- rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+ # we request one more than wanted to see if there are more pages to come
+ probing_limit = limit + 1 if limit is not None else None
- logger.info(
- "Getting ordering for %i rooms since %s", len(room_ids), stream_token
+ results = yield self.store.get_largest_public_rooms(
+ network_tuple,
+ search_filter,
+ probing_limit,
+ bounds=bounds,
+ forwards=forwards,
+ ignore_non_federatable=from_federation,
)
- yield concurrently_execute(get_order_for_room, room_ids, 10)
- sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
- sorted_rooms = [room_id for room_id, _ in sorted_entries]
+ def build_room_entry(room):
+ entry = {
+ "room_id": room["room_id"],
+ "name": room["name"],
+ "topic": room["topic"],
+ "canonical_alias": room["canonical_alias"],
+ "num_joined_members": room["joined_members"],
+ "avatar_url": room["avatar"],
+ "world_readable": room["history_visibility"] == "world_readable",
+ "guest_can_join": room["guest_access"] == "can_join",
+ }
- # `sorted_rooms` should now be a list of all public room ids that is
- # stable across pagination. Therefore, we can use indices into this
- # list as our pagination tokens.
+ # Filter out Nones – rather omit the field altogether
+ return {k: v for k, v in entry.items() if v is not None}
- # Filter out rooms that we don't want to return
- rooms_to_scan = [
- r
- for r in sorted_rooms
- if r not in newly_unpublished and rooms_to_num_joined[r] > 0
- ]
+ results = [build_room_entry(r) for r in results]
- total_room_count = len(rooms_to_scan)
+ response = {}
+ num_results = len(results)
+ if limit is not None:
+ more_to_come = num_results == probing_limit
- if since_token:
- # Filter out rooms we've already returned previously
- # `since_token.current_limit` is the index of the last room we
- # sent down, so we exclude it and everything before/after it.
- if since_token.direction_is_forward:
- rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
+ # Depending on direction we trim either the front or back.
+ if forwards:
+ results = results[:limit]
else:
- rooms_to_scan = rooms_to_scan[: since_token.current_limit]
- rooms_to_scan.reverse()
-
- logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
- # _append_room_entry_to_chunk will append to chunk but will stop if
- # len(chunk) > limit
- #
- # Normally we will generate enough results on the first iteration here,
- # but if there is a search filter, _append_room_entry_to_chunk may
- # filter some results out, in which case we loop again.
- #
- # We don't want to scan over the entire range either as that
- # would potentially waste a lot of work.
- #
- # XXX if there is no limit, we may end up DoSing the server with
- # calls to get_current_state_ids for every single room on the
- # server. Surely we should cap this somehow?
- #
- if limit:
- step = limit + 1
+ results = results[-limit:]
else:
- # step cannot be zero
- step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
- chunk = []
- for i in range(0, len(rooms_to_scan), step):
- if timeout and self.clock.time() > timeout:
- raise Exception("Timed out searching room directory")
-
- batch = rooms_to_scan[i : i + step]
- logger.info("Processing %i rooms for result", len(batch))
- yield concurrently_execute(
- lambda r: self._append_room_entry_to_chunk(
- r,
- rooms_to_num_joined[r],
- chunk,
- limit,
- search_filter,
- from_federation=from_federation,
- ),
- batch,
- 5,
- )
- logger.info("Now %i rooms in result", len(chunk))
- if len(chunk) >= limit + 1:
- break
-
- chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
- # Work out the new limit of the batch for pagination, or None if we
- # know there are no more results that would be returned.
- # i.e., [since_token.current_limit..new_limit] is the batch of rooms
- # we've returned (or the reverse if we paginated backwards)
- # We tried to pull out limit + 1 rooms above, so if we have <= limit
- # then we know there are no more results to return
- new_limit = None
- if chunk and (not limit or len(chunk) > limit):
-
- if not since_token or since_token.direction_is_forward:
- if limit:
- chunk = chunk[:limit]
- last_room_id = chunk[-1]["room_id"]
+ more_to_come = False
+
+ if num_results > 0:
+ final_entry = results[-1]
+ initial_entry = results[0]
+
+ if forwards:
+ if batch_token:
+ # If there was a token given then we assume that there
+ # must be previous results.
+ response["prev_batch"] = RoomListNextBatch(
+ last_joined_members=initial_entry["num_joined_members"],
+ last_room_id=initial_entry["room_id"],
+ direction_is_forward=False,
+ ).to_token()
+
+ if more_to_come:
+ response["next_batch"] = RoomListNextBatch(
+ last_joined_members=final_entry["num_joined_members"],
+ last_room_id=final_entry["room_id"],
+ direction_is_forward=True,
+ ).to_token()
else:
- if limit:
- chunk = chunk[-limit:]
- last_room_id = chunk[0]["room_id"]
-
- new_limit = sorted_rooms.index(last_room_id)
-
- results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
- if since_token:
- results["new_rooms"] = bool(newly_visible)
-
- if not since_token or since_token.direction_is_forward:
- if new_limit is not None:
- results["next_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=True,
- ).to_token()
-
- if since_token:
- results["prev_batch"] = since_token.copy_and_replace(
- direction_is_forward=False,
- current_limit=since_token.current_limit + 1,
- ).to_token()
- else:
- if new_limit is not None:
- results["prev_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=False,
- ).to_token()
-
- if since_token:
- results["next_batch"] = since_token.copy_and_replace(
- direction_is_forward=True,
- current_limit=since_token.current_limit - 1,
- ).to_token()
-
- return results
-
- @defer.inlineCallbacks
- def _append_room_entry_to_chunk(
- self,
- room_id,
- num_joined_users,
- chunk,
- limit,
- search_filter,
- from_federation=False,
- ):
- """Generate the entry for a room in the public room list and append it
- to the `chunk` if it matches the search filter
-
- Args:
- room_id (str): The ID of the room.
- num_joined_users (int): The number of joined users in the room.
- chunk (list)
- limit (int|None): Maximum amount of rooms to display. Function will
- return if length of chunk is greater than limit + 1.
- search_filter (dict|None)
- from_federation (bool): Whether this request originated from a
- federating server or a client. Used for room filtering.
- """
- if limit and len(chunk) > limit + 1:
- # We've already got enough, so lets just drop it.
- return
+ if batch_token:
+ response["next_batch"] = RoomListNextBatch(
+ last_joined_members=final_entry["num_joined_members"],
+ last_room_id=final_entry["room_id"],
+ direction_is_forward=True,
+ ).to_token()
+
+ if more_to_come:
+ response["prev_batch"] = RoomListNextBatch(
+ last_joined_members=initial_entry["num_joined_members"],
+ last_room_id=initial_entry["room_id"],
+ direction_is_forward=False,
+ ).to_token()
+
+ for room in results:
+ # populate search result entries with additional fields, namely
+ # 'aliases'
+ room_id = room["room_id"]
+
+ aliases = yield self.store.get_aliases_for_room(room_id)
+ if aliases:
+ room["aliases"] = aliases
- result = yield self.generate_room_entry(room_id, num_joined_users)
- if not result:
- return
+ response["chunk"] = results
- if from_federation and not result.get("m.federate", True):
- # This is a room that other servers cannot join. Do not show them
- # this room.
- return
+ response["total_room_count_estimate"] = yield self.store.count_public_rooms(
+ network_tuple, ignore_non_federatable=from_federation
+ )
- if _matches_room_entry(result, search_filter):
- chunk.append(result)
+ return response
@cachedInlineCallbacks(num_args=1, cache_context=True)
def generate_room_entry(
@@ -580,18 +446,15 @@ class RoomListNextBatch(
namedtuple(
"RoomListNextBatch",
(
- "stream_ordering", # stream_ordering of the first public room list
- "public_room_stream_id", # public room stream id for first public room list
- "current_limit", # The number of previous rooms returned
+ "last_joined_members", # The count to get rooms after/before
+ "last_room_id", # The room_id to get rooms after/before
"direction_is_forward", # Bool if this is a next_batch, false if prev_batch
),
)
):
-
KEY_DICT = {
- "stream_ordering": "s",
- "public_room_stream_id": "p",
- "current_limit": "n",
+ "last_joined_members": "m",
+ "last_room_id": "r",
"direction_is_forward": "d",
}
@@ -599,13 +462,7 @@ class RoomListNextBatch(
@classmethod
def from_token(cls, token):
- if PY3:
- # The argument raw=False is only available on new versions of
- # msgpack, and only really needed on Python 3. Gate it behind
- # a PY3 check to avoid causing issues on Debian-packaged versions.
- decoded = msgpack.loads(decode_base64(token), raw=False)
- else:
- decoded = msgpack.loads(decode_base64(token))
+ decoded = msgpack.loads(decode_base64(token), raw=False)
return RoomListNextBatch(
**{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()}
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 093f2ea36e..380e2fad5e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -20,15 +20,11 @@ import logging
from six.moves import http_client
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
-from unpaddedbase64 import decode_base64
-
from twisted.internet import defer
from synapse import types
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
+from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.types import RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -37,8 +33,6 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
-id_server_scheme = "https://"
-
class RoomMemberHandler(object):
# TODO(paul): This handler currently contains a messy conflation of
@@ -59,10 +53,10 @@ class RoomMemberHandler(object):
self.auth = hs.get_auth()
self.state_handler = hs.get_state_handler()
self.config = hs.config
- self.simple_http_client = hs.get_simple_http_client()
self.federation_handler = hs.get_handlers().federation_handler
self.directory_handler = hs.get_handlers().directory_handler
+ self.identity_handler = hs.get_handlers().identity_handler
self.registration_handler = hs.get_registration_handler()
self.profile_handler = hs.get_profile_handler()
self.event_creation_handler = hs.get_event_creation_handler()
@@ -100,7 +94,7 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+ def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -209,23 +203,11 @@ class RoomMemberHandler(object):
prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined:
- yield self._user_joined_room(target, room_id)
-
- # Copy over direct message status and room tags if this is a join
- # on an upgraded room
-
- # Check if this is an upgraded room
- predecessor = yield self.store.get_room_predecessor(room_id)
-
- if predecessor:
- # It is an upgraded room. Copy over old tags
- self.copy_room_tags_and_direct_to_room(
- predecessor["room_id"], room_id, user_id
- )
- # Move over old push rules
- self.store.move_push_rules_from_room_to_room_for_user(
- predecessor["room_id"], room_id, user_id
+ # Copy over user state if we're joining an upgraded room
+ yield self.copy_user_state_if_room_upgrade(
+ room_id, requester.user.to_string()
)
+ yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -469,10 +451,16 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
- ret = yield self._remote_join(
+ remote_join_response = yield self._remote_join(
requester, remote_room_hosts, room_id, target, content
)
- return ret
+
+ # Copy over user state if this is a join on an remote upgraded room
+ yield self.copy_user_state_if_room_upgrade(
+ room_id, requester.user.to_string()
+ )
+
+ return remote_join_response
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
@@ -510,9 +498,39 @@ class RoomMemberHandler(object):
return res
@defer.inlineCallbacks
- def send_membership_event(
- self, requester, event, context, remote_room_hosts=None, ratelimit=True
- ):
+ def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
+ """Copy user-specific information when they join a new room if that new room is the
+ result of a room upgrade
+
+ Args:
+ new_room_id (str): The ID of the room the user is joining
+ user_id (str): The ID of the user
+
+ Returns:
+ Deferred
+ """
+ # Check if the new room is an upgraded room
+ predecessor = yield self.store.get_room_predecessor(new_room_id)
+ if not predecessor:
+ return
+
+ logger.debug(
+ "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+ new_room_id,
+ predecessor,
+ )
+
+ # It is an upgraded room. Copy over old tags
+ yield self.copy_room_tags_and_direct_to_room(
+ predecessor["room_id"], new_room_id, user_id
+ )
+ # Copy over push rules
+ yield self.store.copy_push_rules_from_room_to_room_for_user(
+ predecessor["room_id"], new_room_id, user_id
+ )
+
+ @defer.inlineCallbacks
+ def send_membership_event(self, requester, event, context, ratelimit=True):
"""
Change the membership status of a user in a room.
@@ -522,16 +540,10 @@ class RoomMemberHandler(object):
act as the sender, will be skipped.
event (SynapseEvent): The membership event.
context: The context of the event.
- is_guest (bool): Whether the sender is a guest.
- room_hosts ([str]): Homeservers which are likely to already be in
- the room, and could be danced with in order to join this
- homeserver for the first time.
ratelimit (bool): Whether to rate limit this request.
Raises:
SynapseError if there was a problem changing the membership.
"""
- remote_room_hosts = remote_room_hosts or []
-
target_user = UserID.from_string(event.state_key)
room_id = event.room_id
@@ -634,7 +646,7 @@ class RoomMemberHandler(object):
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
- return (RoomID.from_string(room_id), servers)
+ return RoomID.from_string(room_id), servers
@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
@@ -646,7 +658,15 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks
def do_3pid_invite(
- self, room_id, inviter, medium, address, id_server, requester, txn_id
+ self,
+ room_id,
+ inviter,
+ medium,
+ address,
+ id_server,
+ requester,
+ txn_id,
+ id_access_token=None,
):
if self.config.block_non_admin_invites:
is_requester_admin = yield self.auth.is_server_admin(requester.user)
@@ -669,75 +689,42 @@ class RoomMemberHandler(object):
Codes.FORBIDDEN,
)
- invitee = yield self._lookup_3pid(id_server, medium, address)
-
- if invitee:
- yield self.update_membership(
- requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
- )
- else:
- yield self._make_and_store_3pid_invite(
- requester, id_server, medium, address, room_id, inviter, txn_id=txn_id
- )
-
- @defer.inlineCallbacks
- def _lookup_3pid(self, id_server, medium, address):
- """Looks up a 3pid in the passed identity server.
-
- Args:
- id_server (str): The server name (including port, if required)
- of the identity server to use.
- medium (str): The type of the third party identifier (e.g. "email").
- address (str): The third party identifier (e.g. "foo@example.com").
-
- Returns:
- str: the matrix ID of the 3pid, or None if it is not recognized.
- """
if not self._enable_lookup:
raise SynapseError(
403, "Looking up third-party identifiers is denied from this server"
)
- try:
- data = yield self.simple_http_client.get_json(
- "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
- {"medium": medium, "address": address},
- )
-
- if "mxid" in data:
- if "signatures" not in data:
- raise AuthError(401, "No signatures on 3pid binding")
- yield self._verify_any_signature(data, id_server)
- return data["mxid"]
- except IOError as e:
- logger.warn("Error from identity server lookup: %s" % (e,))
- return None
+ invitee = yield self.identity_handler.lookup_3pid(
+ id_server, medium, address, id_access_token
+ )
- @defer.inlineCallbacks
- def _verify_any_signature(self, data, server_hostname):
- if server_hostname not in data["signatures"]:
- raise AuthError(401, "No signature from server %s" % (server_hostname,))
- for key_name, signature in data["signatures"][server_hostname].items():
- key_data = yield self.simple_http_client.get_json(
- "%s%s/_matrix/identity/api/v1/pubkey/%s"
- % (id_server_scheme, server_hostname, key_name)
+ if invitee:
+ yield self.update_membership(
+ requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
- if "public_key" not in key_data:
- raise AuthError(
- 401, "No public key named %s from %s" % (key_name, server_hostname)
- )
- verify_signed_json(
- data,
- server_hostname,
- decode_verify_key_bytes(
- key_name, decode_base64(key_data["public_key"])
- ),
+ else:
+ yield self._make_and_store_3pid_invite(
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ inviter,
+ txn_id=txn_id,
+ id_access_token=id_access_token,
)
- return
@defer.inlineCallbacks
def _make_and_store_3pid_invite(
- self, requester, id_server, medium, address, room_id, user, txn_id
+ self,
+ requester,
+ id_server,
+ medium,
+ address,
+ room_id,
+ user,
+ txn_id,
+ id_access_token=None,
):
room_state = yield self.state_handler.get_current_state(room_id)
@@ -773,7 +760,7 @@ class RoomMemberHandler(object):
room_avatar_url = room_avatar_event.content.get("url", "")
token, public_keys, fallback_public_key, display_name = (
- yield self._ask_id_server_for_third_party_invite(
+ yield self.identity_handler.ask_id_server_for_third_party_invite(
requester=requester,
id_server=id_server,
medium=medium,
@@ -786,6 +773,7 @@ class RoomMemberHandler(object):
room_name=room_name,
inviter_display_name=inviter_display_name,
inviter_avatar_url=inviter_avatar_url,
+ id_access_token=id_access_token,
)
)
@@ -809,103 +797,6 @@ class RoomMemberHandler(object):
)
@defer.inlineCallbacks
- def _ask_id_server_for_third_party_invite(
- self,
- requester,
- id_server,
- medium,
- address,
- room_id,
- inviter_user_id,
- room_alias,
- room_avatar_url,
- room_join_rules,
- room_name,
- inviter_display_name,
- inviter_avatar_url,
- ):
- """
- Asks an identity server for a third party invite.
-
- Args:
- requester (Requester)
- id_server (str): hostname + optional port for the identity server.
- medium (str): The literal string "email".
- address (str): The third party address being invited.
- room_id (str): The ID of the room to which the user is invited.
- inviter_user_id (str): The user ID of the inviter.
- room_alias (str): An alias for the room, for cosmetic notifications.
- room_avatar_url (str): The URL of the room's avatar, for cosmetic
- notifications.
- room_join_rules (str): The join rules of the email (e.g. "public").
- room_name (str): The m.room.name of the room.
- inviter_display_name (str): The current display name of the
- inviter.
- inviter_avatar_url (str): The URL of the inviter's avatar.
-
- Returns:
- A deferred tuple containing:
- token (str): The token which must be signed to prove authenticity.
- public_keys ([{"public_key": str, "key_validity_url": str}]):
- public_key is a base64-encoded ed25519 public key.
- fallback_public_key: One element from public_keys.
- display_name (str): A user-friendly name to represent the invited
- user.
- """
-
- is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
- id_server_scheme,
- id_server,
- )
-
- invite_config = {
- "medium": medium,
- "address": address,
- "room_id": room_id,
- "room_alias": room_alias,
- "room_avatar_url": room_avatar_url,
- "room_join_rules": room_join_rules,
- "room_name": room_name,
- "sender": inviter_user_id,
- "sender_display_name": inviter_display_name,
- "sender_avatar_url": inviter_avatar_url,
- }
-
- try:
- data = yield self.simple_http_client.post_json_get_json(
- is_url, invite_config
- )
- except HttpResponseException as e:
- # Some identity servers may only support application/x-www-form-urlencoded
- # types. This is especially true with old instances of Sydent, see
- # https://github.com/matrix-org/sydent/pull/170
- logger.info(
- "Failed to POST %s with JSON, falling back to urlencoded form: %s",
- is_url,
- e,
- )
- data = yield self.simple_http_client.post_urlencoded_get_json(
- is_url, invite_config
- )
-
- # TODO: Check for success
- token = data["token"]
- public_keys = data.get("public_keys", [])
- if "public_key" in data:
- fallback_public_key = {
- "public_key": data["public_key"],
- "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid"
- % (id_server_scheme, id_server),
- }
- else:
- fallback_public_key = public_keys[0]
-
- if not public_keys:
- public_keys.append(fallback_public_key)
- display_name = data["display_name"]
- return token, public_keys, fallback_public_key, display_name
-
- @defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very
# first member event?
@@ -1057,7 +948,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
- logger.warn("Failed to reject invite: %s", e)
+ logger.warning("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(target.to_string(), room_id)
return {}
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index a1ce6929cf..cc9e6b9bd0 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -21,6 +21,8 @@ from saml2.client import Saml2Client
from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_string
from synapse.rest.client.v1.login import SSOAuthHandler
+from synapse.types import UserID, map_username_to_mxid_localpart
+from synapse.util.async_helpers import Linearizer
logger = logging.getLogger(__name__)
@@ -29,12 +31,26 @@ class SamlHandler:
def __init__(self, hs):
self._saml_client = Saml2Client(hs.config.saml2_sp_config)
self._sso_auth_handler = SSOAuthHandler(hs)
+ self._registration_handler = hs.get_registration_handler()
+
+ self._clock = hs.get_clock()
+ self._datastore = hs.get_datastore()
+ self._hostname = hs.hostname
+ self._saml2_session_lifetime = hs.config.saml2_session_lifetime
+ self._mxid_source_attribute = hs.config.saml2_mxid_source_attribute
+ self._grandfathered_mxid_source_attribute = (
+ hs.config.saml2_grandfathered_mxid_source_attribute
+ )
+ self._mxid_mapper = hs.config.saml2_mxid_mapper
+
+ # identifier for the external_ids table
+ self._auth_provider_id = "saml"
# a map from saml session id to Saml2SessionData object
self._outstanding_requests_dict = {}
- self._clock = hs.get_clock()
- self._saml2_session_lifetime = hs.config.saml2_session_lifetime
+ # a lock on the mappings
+ self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock)
def handle_redirect_request(self, client_redirect_url):
"""Handle an incoming request to /login/sso/redirect
@@ -60,7 +76,7 @@ class SamlHandler:
# this shouldn't happen!
raise Exception("prepare_for_authenticate didn't return a Location header")
- def handle_saml_response(self, request):
+ async def handle_saml_response(self, request):
"""Handle an incoming request to /_matrix/saml2/authn_response
Args:
@@ -77,6 +93,10 @@ class SamlHandler:
# the dict.
self.expire_sessions()
+ user_id = await self._map_saml_response_to_user(resp_bytes)
+ self._sso_auth_handler.complete_sso_login(user_id, request, relay_state)
+
+ async def _map_saml_response_to_user(self, resp_bytes):
try:
saml2_auth = self._saml_client.parse_authn_request_response(
resp_bytes,
@@ -91,18 +111,88 @@ class SamlHandler:
logger.warning("SAML2 response was not signed")
raise SynapseError(400, "SAML2 response was not signed")
- if "uid" not in saml2_auth.ava:
+ logger.info("SAML2 response: %s", saml2_auth.origxml)
+ logger.info("SAML2 mapped attributes: %s", saml2_auth.ava)
+
+ try:
+ remote_user_id = saml2_auth.ava["uid"][0]
+ except KeyError:
logger.warning("SAML2 response lacks a 'uid' attestation")
raise SynapseError(400, "uid not in SAML2 response")
+ try:
+ mxid_source = saml2_auth.ava[self._mxid_source_attribute][0]
+ except KeyError:
+ logger.warning(
+ "SAML2 response lacks a '%s' attestation", self._mxid_source_attribute
+ )
+ raise SynapseError(
+ 400, "%s not in SAML2 response" % (self._mxid_source_attribute,)
+ )
+
self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
- username = saml2_auth.ava["uid"][0]
displayName = saml2_auth.ava.get("displayName", [None])[0]
- return self._sso_auth_handler.on_successful_auth(
- username, request, relay_state, user_display_name=displayName
- )
+ with (await self._mapping_lock.queue(self._auth_provider_id)):
+ # first of all, check if we already have a mapping for this user
+ logger.info(
+ "Looking for existing mapping for user %s:%s",
+ self._auth_provider_id,
+ remote_user_id,
+ )
+ registered_user_id = await self._datastore.get_user_by_external_id(
+ self._auth_provider_id, remote_user_id
+ )
+ if registered_user_id is not None:
+ logger.info("Found existing mapping %s", registered_user_id)
+ return registered_user_id
+
+ # backwards-compatibility hack: see if there is an existing user with a
+ # suitable mapping from the uid
+ if (
+ self._grandfathered_mxid_source_attribute
+ and self._grandfathered_mxid_source_attribute in saml2_auth.ava
+ ):
+ attrval = saml2_auth.ava[self._grandfathered_mxid_source_attribute][0]
+ user_id = UserID(
+ map_username_to_mxid_localpart(attrval), self._hostname
+ ).to_string()
+ logger.info(
+ "Looking for existing account based on mapped %s %s",
+ self._grandfathered_mxid_source_attribute,
+ user_id,
+ )
+
+ users = await self._datastore.get_users_by_id_case_insensitive(user_id)
+ if users:
+ registered_user_id = list(users.keys())[0]
+ logger.info("Grandfathering mapping to %s", registered_user_id)
+ await self._datastore.record_user_external_id(
+ self._auth_provider_id, remote_user_id, registered_user_id
+ )
+ return registered_user_id
+
+ # figure out a new mxid for this user
+ base_mxid_localpart = self._mxid_mapper(mxid_source)
+
+ suffix = 0
+ while True:
+ localpart = base_mxid_localpart + (str(suffix) if suffix else "")
+ if not await self._datastore.get_users_by_id_case_insensitive(
+ UserID(localpart, self._hostname).to_string()
+ ):
+ break
+ suffix += 1
+ logger.info("Allocating mxid for new user with localpart %s", localpart)
+
+ registered_user_id = await self._registration_handler.register_user(
+ localpart=localpart, default_display_name=displayName
+ )
+ await self._datastore.record_user_external_id(
+ self._auth_provider_id, remote_user_id, registered_user_id
+ )
+ return registered_user_id
def expire_sessions(self):
expire_before = self._clock.time_msec() - self._saml2_session_lifetime
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 3c265f3718..466daf9202 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -84,17 +84,26 @@ class StatsHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date
while True:
- deltas = yield self.store.get_current_state_deltas(self.pos)
+ # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+ # deltas, since there is otherwise a chance that we could miss updates which arrive
+ # after we check the deltas.
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos == room_max_stream_ordering:
+ break
+
+ logger.debug(
+ "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self.pos, room_max_stream_ordering
+ )
if deltas:
logger.debug("Handling %d state deltas", len(deltas))
room_deltas, user_deltas = yield self._handle_deltas(deltas)
-
- max_pos = deltas[-1]["stream_id"]
else:
room_deltas = {}
user_deltas = {}
- max_pos = yield self.store.get_room_max_stream_ordering()
# Then count deltas for total_events and total_event_bytes.
room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
@@ -117,10 +126,9 @@ class StatsHandler(StateDeltasHandler):
stream_id=max_pos,
)
- event_processing_positions.labels("stats").set(max_pos)
+ logger.debug("Handled room stats to %s -> %s", self.pos, max_pos)
- if self.pos == max_pos:
- break
+ event_processing_positions.labels("stats").set(max_pos)
self.pos = max_pos
@@ -287,6 +295,7 @@ class StatsHandler(StateDeltasHandler):
room_state["guest_access"] = event_content.get("guest_access")
for room_id, state in room_to_state_updates.items():
+ logger.info("Updating room_stats_state for %s: %s", room_id, state)
yield self.store.update_room_state(room_id, state)
return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
new file mode 100644
index 0000000000..824f37f8f8
--- /dev/null
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module implements user-interactive auth verification.
+
+TODO: move more stuff out of AuthHandler in here.
+
+"""
+
+from synapse.handlers.ui_auth.checkers import INTERACTIVE_AUTH_CHECKERS # noqa: F401
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
new file mode 100644
index 0000000000..29aa1e5aaf
--- /dev/null
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -0,0 +1,247 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from canonicaljson import json
+
+from twisted.internet import defer
+from twisted.web.client import PartialDownloadError
+
+from synapse.api.constants import LoginType
+from synapse.api.errors import Codes, LoginError, SynapseError
+from synapse.config.emailconfig import ThreepidBehaviour
+
+logger = logging.getLogger(__name__)
+
+
+class UserInteractiveAuthChecker:
+ """Abstract base class for an interactive auth checker"""
+
+ def __init__(self, hs):
+ pass
+
+ def is_enabled(self):
+ """Check if the configuration of the homeserver allows this checker to work
+
+ Returns:
+ bool: True if this login type is enabled.
+ """
+
+ def check_auth(self, authdict, clientip):
+ """Given the authentication dict from the client, attempt to check this step
+
+ Args:
+ authdict (dict): authentication dictionary from the client
+ clientip (str): The IP address of the client.
+
+ Raises:
+ SynapseError if authentication failed
+
+ Returns:
+ Deferred: the result of authentication (to pass back to the client?)
+ """
+ raise NotImplementedError()
+
+
+class DummyAuthChecker(UserInteractiveAuthChecker):
+ AUTH_TYPE = LoginType.DUMMY
+
+ def is_enabled(self):
+ return True
+
+ def check_auth(self, authdict, clientip):
+ return defer.succeed(True)
+
+
+class TermsAuthChecker(UserInteractiveAuthChecker):
+ AUTH_TYPE = LoginType.TERMS
+
+ def is_enabled(self):
+ return True
+
+ def check_auth(self, authdict, clientip):
+ return defer.succeed(True)
+
+
+class RecaptchaAuthChecker(UserInteractiveAuthChecker):
+ AUTH_TYPE = LoginType.RECAPTCHA
+
+ def __init__(self, hs):
+ super().__init__(hs)
+ self._enabled = bool(hs.config.recaptcha_private_key)
+ self._http_client = hs.get_simple_http_client()
+ self._url = hs.config.recaptcha_siteverify_api
+ self._secret = hs.config.recaptcha_private_key
+
+ def is_enabled(self):
+ return self._enabled
+
+ @defer.inlineCallbacks
+ def check_auth(self, authdict, clientip):
+ try:
+ user_response = authdict["response"]
+ except KeyError:
+ # Client tried to provide captcha but didn't give the parameter:
+ # bad request.
+ raise LoginError(
+ 400, "Captcha response is required", errcode=Codes.CAPTCHA_NEEDED
+ )
+
+ logger.info(
+ "Submitting recaptcha response %s with remoteip %s", user_response, clientip
+ )
+
+ # TODO: get this from the homeserver rather than creating a new one for
+ # each request
+ try:
+ resp_body = yield self._http_client.post_urlencoded_get_json(
+ self._url,
+ args={
+ "secret": self._secret,
+ "response": user_response,
+ "remoteip": clientip,
+ },
+ )
+ except PartialDownloadError as pde:
+ # Twisted is silly
+ data = pde.response
+ resp_body = json.loads(data)
+
+ if "success" in resp_body:
+ # Note that we do NOT check the hostname here: we explicitly
+ # intend the CAPTCHA to be presented by whatever client the
+ # user is using, we just care that they have completed a CAPTCHA.
+ logger.info(
+ "%s reCAPTCHA from hostname %s",
+ "Successful" if resp_body["success"] else "Failed",
+ resp_body.get("hostname"),
+ )
+ if resp_body["success"]:
+ return True
+ raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+
+class _BaseThreepidAuthChecker:
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def _check_threepid(self, medium, authdict):
+ if "threepid_creds" not in authdict:
+ raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
+
+ threepid_creds = authdict["threepid_creds"]
+
+ identity_handler = self.hs.get_handlers().identity_handler
+
+ logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
+
+ # msisdns are currently always ThreepidBehaviour.REMOTE
+ if medium == "msisdn":
+ if not self.hs.config.account_threepid_delegate_msisdn:
+ raise SynapseError(
+ 400, "Phone number verification is not enabled on this homeserver"
+ )
+ threepid = yield identity_handler.threepid_from_creds(
+ self.hs.config.account_threepid_delegate_msisdn, threepid_creds
+ )
+ elif medium == "email":
+ if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
+ assert self.hs.config.account_threepid_delegate_email
+ threepid = yield identity_handler.threepid_from_creds(
+ self.hs.config.account_threepid_delegate_email, threepid_creds
+ )
+ elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+ threepid = None
+ row = yield self.store.get_threepid_validation_session(
+ medium,
+ threepid_creds["client_secret"],
+ sid=threepid_creds["sid"],
+ validated=True,
+ )
+
+ if row:
+ threepid = {
+ "medium": row["medium"],
+ "address": row["address"],
+ "validated_at": row["validated_at"],
+ }
+
+ # Valid threepid returned, delete from the db
+ yield self.store.delete_threepid_session(threepid_creds["sid"])
+ else:
+ raise SynapseError(
+ 400, "Email address verification is not enabled on this homeserver"
+ )
+ else:
+ # this can't happen!
+ raise AssertionError("Unrecognized threepid medium: %s" % (medium,))
+
+ if not threepid:
+ raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+ if threepid["medium"] != medium:
+ raise LoginError(
+ 401,
+ "Expecting threepid of type '%s', got '%s'"
+ % (medium, threepid["medium"]),
+ errcode=Codes.UNAUTHORIZED,
+ )
+
+ threepid["threepid_creds"] = authdict["threepid_creds"]
+
+ return threepid
+
+
+class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
+ AUTH_TYPE = LoginType.EMAIL_IDENTITY
+
+ def __init__(self, hs):
+ UserInteractiveAuthChecker.__init__(self, hs)
+ _BaseThreepidAuthChecker.__init__(self, hs)
+
+ def is_enabled(self):
+ return self.hs.config.threepid_behaviour_email in (
+ ThreepidBehaviour.REMOTE,
+ ThreepidBehaviour.LOCAL,
+ )
+
+ def check_auth(self, authdict, clientip):
+ return self._check_threepid("email", authdict)
+
+
+class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
+ AUTH_TYPE = LoginType.MSISDN
+
+ def __init__(self, hs):
+ UserInteractiveAuthChecker.__init__(self, hs)
+ _BaseThreepidAuthChecker.__init__(self, hs)
+
+ def is_enabled(self):
+ return bool(self.hs.config.account_threepid_delegate_msisdn)
+
+ def check_auth(self, authdict, clientip):
+ return self._check_threepid("msisdn", authdict)
+
+
+INTERACTIVE_AUTH_CHECKERS = [
+ DummyAuthChecker,
+ TermsAuthChecker,
+ RecaptchaAuthChecker,
+ EmailIdentityAuthChecker,
+ MsisdnAuthChecker,
+]
+"""A list of UserInteractiveAuthChecker classes"""
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index e53669e40d..624f05ab5b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler):
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "user_dir_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
- if not deltas:
+ room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+ if self.pos == room_max_stream_ordering:
return
+ logger.debug(
+ "Processing user stats %s->%s", self.pos, room_max_stream_ordering
+ )
+ max_pos, deltas = yield self.store.get_current_state_deltas(
+ self.pos, room_max_stream_ordering
+ )
+
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
- self.pos = deltas[-1]["stream_id"]
+ self.pos = max_pos
# Expose current event processing position to prometheus
synapse.metrics.event_processing_positions.labels("user_dir").set(
- self.pos
+ max_pos
)
- yield self.store.update_user_directory_stream_pos(self.pos)
+ yield self.store.update_user_directory_stream_pos(max_pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
|