diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e7a1bb7246..fffba34383 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -48,7 +47,6 @@ class AuthHandler(BaseHandler):
LoginType.PASSWORD: self._check_password_auth,
LoginType.RECAPTCHA: self._check_recaptcha,
LoginType.EMAIL_IDENTITY: self._check_email_identity,
- LoginType.MSISDN: self._check_msisdn,
LoginType.DUMMY: self._check_dummy_auth,
}
self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -309,47 +307,31 @@ class AuthHandler(BaseHandler):
defer.returnValue(True)
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- def _check_email_identity(self, authdict, _):
- return self._check_threepid('email', authdict)
-
- def _check_msisdn(self, authdict, _):
- return self._check_threepid('msisdn', authdict)
-
@defer.inlineCallbacks
- def _check_dummy_auth(self, authdict, _):
- yield run_on_reactor()
- defer.returnValue(True)
-
- @defer.inlineCallbacks
- def _check_threepid(self, medium, authdict):
+ def _check_email_identity(self, authdict, _):
yield run_on_reactor()
if 'threepid_creds' not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
threepid_creds = authdict['threepid_creds']
-
identity_handler = self.hs.get_handlers().identity_handler
- logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
+ logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
threepid = yield identity_handler.threepid_from_creds(threepid_creds)
if not threepid:
raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
- if threepid['medium'] != medium:
- raise LoginError(
- 401,
- "Expecting threepid of type '%s', got '%s'" % (
- medium, threepid['medium'],
- ),
- errcode=Codes.UNAUTHORIZED
- )
-
threepid['threepid_creds'] = authdict['threepid_creds']
defer.returnValue(threepid)
+ @defer.inlineCallbacks
+ def _check_dummy_auth(self, authdict, _):
+ yield run_on_reactor()
+ defer.returnValue(True)
+
def _get_params_recaptcha(self):
return {"public_key": self.hs.config.recaptcha_public_key}
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9374c085db..1b007d4945 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -170,6 +170,40 @@ class DeviceHandler(BaseHandler):
yield self.notify_device_update(user_id, [device_id])
@defer.inlineCallbacks
+ def delete_devices(self, user_id, device_ids):
+ """ Delete several devices
+
+ Args:
+ user_id (str):
+ device_ids (str): The list of device IDs to delete
+
+ Returns:
+ defer.Deferred:
+ """
+
+ try:
+ yield self.store.delete_devices(user_id, device_ids)
+ except errors.StoreError, e:
+ if e.code == 404:
+ # no match
+ pass
+ else:
+ raise
+
+ # Delete access tokens and e2e keys for each device. Not optimised as it is not
+ # considered as part of a critical path.
+ for device_id in device_ids:
+ yield self.store.user_delete_access_tokens(
+ user_id, device_id=device_id,
+ delete_refresh_tokens=True,
+ )
+ yield self.store.delete_e2e_keys_by_device(
+ user_id=user_id, device_id=device_id
+ )
+
+ yield self.notify_device_update(user_id, device_ids)
+
+ @defer.inlineCallbacks
def update_device(self, user_id, device_id, content):
""" Update the given device
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ed0fa51e7f..d0c2b4d6ed 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -31,7 +31,7 @@ from synapse.util.logcontext import (
)
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@@ -79,12 +79,204 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
+ self._room_pdu_linearizer = Linearizer("fed_room_pdu")
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_receive_pdu(self, origin, pdu, get_missing=True):
+ """ Process a PDU received via a federation /send/ transaction, or
+ via backfill of missing prev_events
+
+ Args:
+ origin (str): server which initiated the /send/ transaction. Will
+ be used to fetch missing events or state.
+ pdu (FrozenEvent): received PDU
+ get_missing (bool): True if we should fetch missing prev_events
+
+ Returns (Deferred): completes with None
+ """
+
+ # We reprocess pdus when we have seen them only as outliers
+ existing = yield self.get_persisted_pdu(
+ origin, pdu.event_id, do_auth=False
+ )
+
+ # FIXME: Currently we fetch an event again when we already have it
+ # if it has been marked as an outlier.
+
+ already_seen = (
+ existing and (
+ not existing.internal_metadata.is_outlier()
+ or pdu.internal_metadata.is_outlier()
+ )
+ )
+ if already_seen:
+ logger.debug("Already seen pdu %s", pdu.event_id)
+ return
+
+ state = None
+
+ auth_chain = []
+
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+
+ fetch_state = False
+
+ # Get missing pdus if necessary.
+ if not pdu.internal_metadata.is_outlier():
+ # We only backfill backwards to the min depth.
+ min_depth = yield self.get_min_depth_for_context(
+ pdu.room_id
+ )
+
+ logger.debug(
+ "_handle_new_pdu min_depth for %s: %d",
+ pdu.room_id, min_depth
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+
+ if min_depth and pdu.depth < min_depth:
+ # This is so that we don't notify the user about this
+ # message, to work around the fact that some events will
+ # reference really really old events we really don't want to
+ # send to the clients.
+ pdu.internal_metadata.outlier = True
+ elif min_depth and pdu.depth > min_depth:
+ if get_missing and prevs - seen:
+ # If we're missing stuff, ensure we only fetch stuff one
+ # at a time.
+ logger.info(
+ "Acquiring lock for room %r to fetch %d missing events: %r...",
+ pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ )
+ with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+ logger.info(
+ "Acquired lock for room %r to fetch %d missing events",
+ pdu.room_id, len(prevs - seen),
+ )
+
+ yield self._get_missing_events_for_pdu(
+ origin, pdu, prevs, min_depth
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+ if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+ fetch_state = True
+
+ if fetch_state:
+ # We need to get the state at this event, since we haven't
+ # processed all the prev events.
+ logger.debug(
+ "_handle_new_pdu getting state for %s",
+ pdu.room_id
+ )
+ try:
+ state, auth_chain = yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, pdu.event_id,
+ )
+ except:
+ logger.exception("Failed to get state for event: %s", pdu.event_id)
+
+ yield self._process_received_pdu(
+ origin,
+ pdu,
+ state=state,
+ auth_chain=auth_chain,
+ )
+
+ @defer.inlineCallbacks
+ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+ """
+ Args:
+ origin (str): Origin of the pdu. Will be called to get the missing events
+ pdu: received pdu
+ prevs (str[]): List of event ids which we are missing
+ min_depth (int): Minimum depth of events to return.
+
+ Returns:
+ Deferred<dict(str, str?)>: updated have_seen dictionary
+ """
+ # We recalculate seen, since it may have changed.
+ have_seen = yield self.store.have_events(prevs)
+ seen = set(have_seen.keys())
+
+ if not prevs - seen:
+ # nothing left to do
+ defer.returnValue(have_seen)
+
+ latest = yield self.store.get_latest_event_ids_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(latest)
+ latest |= seen
+
+ logger.info(
+ "Missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+
+ # XXX: we set timeout to 10s to help workaround
+ # https://github.com/matrix-org/synapse/issues/1733.
+ # The reason is to avoid holding the linearizer lock
+ # whilst processing inbound /send transactions, causing
+ # FDs to stack up and block other inbound transactions
+ # which empirically can currently take up to 30 minutes.
+ #
+ # N.B. this explicitly disables retry attempts.
+ #
+ # N.B. this also increases our chances of falling back to
+ # fetching fresh state for the room if the missing event
+ # can't be found, which slightly reduces our security.
+ # it may also increase our DAG extremity count for the room,
+ # causing additional state resolution? See #1760.
+ # However, fetching state doesn't hold the linearizer lock
+ # apparently.
+ #
+ # see https://github.com/matrix-org/synapse/pull/1744
+
+ missing_events = yield self.replication_layer.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
+ limit=10,
+ min_depth=min_depth,
+ timeout=10000,
+ )
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
+ for e in missing_events:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+ defer.returnValue(have_seen)
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
- """ Called by the ReplicationLayer when we have a new pdu. We need to
- do auth checks and put it through the StateHandler.
+ def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
+ """ Called when we have a new pdu. We need to do auth checks and put it
+ through the StateHandler.
auth_chain and state are None if we already have the necessary state
and prev_events in the db
@@ -738,7 +930,7 @@ class FederationHandler(BaseHandler):
continue
try:
- self.on_receive_pdu(origin, p)
+ self._process_received_pdu(origin, p)
except:
logger.exception("Couldn't handle pdu")
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6a53c5eb47..559e5d5a71 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -151,7 +150,7 @@ class IdentityHandler(BaseHandler):
params.update(kwargs)
try:
- data = yield self.http_client.post_json_get_json(
+ data = yield self.http_client.post_urlencoded_get_json(
"https://%s%s" % (
id_server,
"/_matrix/identity/api/v1/validate/email/requestToken"
@@ -162,37 +161,3 @@ class IdentityHandler(BaseHandler):
except CodeMessageException as e:
logger.info("Proxied requestToken failed: %r", e)
raise e
-
- @defer.inlineCallbacks
- def requestMsisdnToken(
- self, id_server, country, phone_number,
- client_secret, send_attempt, **kwargs
- ):
- yield run_on_reactor()
-
- if not self._should_trust_id_server(id_server):
- raise SynapseError(
- 400, "Untrusted ID server '%s'" % id_server,
- Codes.SERVER_NOT_TRUSTED
- )
-
- params = {
- 'country': country,
- 'phone_number': phone_number,
- 'client_secret': client_secret,
- 'send_attempt': send_attempt,
- }
- params.update(kwargs)
-
- try:
- data = yield self.http_client.post_json_get_json(
- "https://%s%s" % (
- id_server,
- "/_matrix/identity/api/v1/validate/msisdn/requestToken"
- ),
- params
- )
- defer.returnValue(data)
- except CodeMessageException as e:
- logger.info("Proxied requestToken failed: %r", e)
- raise e
|