diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index be3bb59431..9b4acd2ed7 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -743,18 +743,9 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
- time_now = self._clock.time_msec()
- try:
- code, content = yield self.transport_layer.send_invite(
- destination=destination,
- room_id=room_id,
- event_id=event_id,
- content=pdu.get_pdu_json(time_now),
- )
- except HttpResponseException as e:
- if e.code == 403:
- raise e.to_synapse_error()
- raise
+ room_version = yield self.store.get_room_version(room_id)
+
+ content = yield self._do_send_invite(destination, pdu, room_version)
pdu_dict = content["event"]
@@ -772,6 +763,55 @@ class FederationClient(FederationBase):
defer.returnValue(pdu)
+ @defer.inlineCallbacks
+ def _do_send_invite(self, destination, pdu, room_version):
+ """Actually sends the invite, first trying v2 API and falling back to
+ v1 API if necessary.
+
+ Args:
+ destination (str): Target server
+ pdu (FrozenEvent)
+ room_version (str)
+
+ Returns:
+ dict: The event as a dict as returned by the remote server
+ """
+ time_now = self._clock.time_msec()
+
+ try:
+ content = yield self.transport_layer.send_invite_v2(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content={
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ "invite_room_state": pdu.unsigned.get("invite_room_state", []),
+ },
+ )
+ defer.returnValue(content)
+ except HttpResponseException as e:
+ if e.code in [400, 404]:
+ if room_version in (RoomVersions.V1, RoomVersions.V2):
+ pass # We'll fall through
+ else:
+ raise Exception("Remote server is too old")
+ elif e.code == 403:
+ raise e.to_synapse_error()
+ else:
+ raise
+
+ # Didn't work, try v1 API.
+ # Note the v1 API returns a tuple of `(200, content)`
+
+ _, content = yield self.transport_layer.send_invite_v1(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+ defer.returnValue(content)
+
def send_leave(self, destinations, pdu):
"""Sends a leave event to one of a list of homeservers.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4aa04b9588..6681614232 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -322,7 +322,7 @@ class FederationServer(FederationBase):
if self.hs.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
- event,
+ event.get_pdu_json(),
self.hs.hostname,
self.hs.config.signing_key[0]
)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 260178c47b..8e2be218e2 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -21,7 +21,7 @@ from six.moves import urllib
from twisted.internet import defer
from synapse.api.constants import Membership
-from synapse.api.urls import FEDERATION_V1_PREFIX
+from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
from synapse.util.logutils import log_function
logger = logging.getLogger(__name__)
@@ -289,7 +289,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def send_invite(self, destination, room_id, event_id, content):
+ def send_invite_v1(self, destination, room_id, event_id, content):
path = _create_v1_path("/invite/%s/%s", room_id, event_id)
response = yield self.client.put_json(
@@ -303,6 +303,20 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def send_invite_v2(self, destination, room_id, event_id, content):
+ path = _create_v2_path("/invite/%s/%s", room_id, event_id)
+
+ response = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def get_public_rooms(self, remote_server, limit, since_token,
search_filter=None, include_all_networks=False,
third_party_instance_id=None):
@@ -958,3 +972,24 @@ def _create_v1_path(path, *args):
FEDERATION_V1_PREFIX
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
)
+
+
+def _create_v2_path(path, *args):
+ """Creates a path against V2 federation API from the path template and
+ args. Ensures that all args are url encoded.
+
+ Example:
+
+ _create_v2_path("/event/%s/", event_id)
+
+ Args:
+ path (str): String template for the path
+ args: ([str]): Args to insert into path. Each arg will be url encoded
+
+ Returns:
+ str
+ """
+ return (
+ FEDERATION_V2_PREFIX
+ + path % tuple(urllib.parse.quote(arg, "") for arg in args)
+ )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index adf59db7a8..fcaf7530b0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1300,7 +1300,7 @@ class FederationHandler(BaseHandler):
event.signatures.update(
compute_event_signature(
- event,
+ event.get_pdu_json(),
self.hs.hostname,
self.hs.config.signing_key[0]
)
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 4a6f634c8b..07c72c9351 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import cgi
+import json
import logging
import attr
@@ -20,7 +22,7 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.web.client import URI, Agent, HTTPConnectionPool
+from twisted.web.client import URI, Agent, HTTPConnectionPool, readBody
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
@@ -43,13 +45,19 @@ class MatrixFederationAgent(object):
tls_client_options_factory (ClientTLSOptionsFactory|None):
factory to use for fetching client tls options, or none to disable TLS.
+ _well_known_tls_policy (IPolicyForHTTPS|None):
+ TLS policy to use for fetching .well-known files. None to use a default
+ (browser-like) implementation.
+
srv_resolver (SrvResolver|None):
SRVResolver impl to use for looking up SRV records. None to use a default
implementation.
"""
def __init__(
- self, reactor, tls_client_options_factory, _srv_resolver=None,
+ self, reactor, tls_client_options_factory,
+ _well_known_tls_policy=None,
+ _srv_resolver=None,
):
self._reactor = reactor
self._tls_client_options_factory = tls_client_options_factory
@@ -62,6 +70,14 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
+ agent_args = {}
+ if _well_known_tls_policy is not None:
+ # the param is called 'contextFactory', but actually passing a
+ # contextfactory is deprecated, and it expects an IPolicyForHTTPS.
+ agent_args['contextFactory'] = _well_known_tls_policy
+ _well_known_agent = Agent(self._reactor, pool=self._pool, **agent_args)
+ self._well_known_agent = _well_known_agent
+
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
"""
@@ -114,7 +130,11 @@ class MatrixFederationAgent(object):
class EndpointFactory(object):
@staticmethod
def endpointForURI(_uri):
- logger.info("Connecting to %s:%s", res.target_host, res.target_port)
+ logger.info(
+ "Connecting to %s:%i",
+ res.target_host.decode("ascii"),
+ res.target_port,
+ )
ep = HostnameEndpoint(self._reactor, res.target_host, res.target_port)
if tls_options is not None:
ep = wrapClientTLS(tls_options, ep)
@@ -127,7 +147,7 @@ class MatrixFederationAgent(object):
defer.returnValue(res)
@defer.inlineCallbacks
- def _route_matrix_uri(self, parsed_uri):
+ def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
"""Helper for `request`: determine the routing for a Matrix URI
Args:
@@ -135,6 +155,9 @@ class MatrixFederationAgent(object):
parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
if there is no explicit port given.
+ lookup_well_known (bool): True if we should look up the .well-known file if
+ there is no SRV record.
+
Returns:
Deferred[_RoutingResult]
"""
@@ -169,6 +192,42 @@ class MatrixFederationAgent(object):
service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
server_list = yield self._srv_resolver.resolve_service(service_name)
+ if not server_list and lookup_well_known:
+ # try a .well-known lookup
+ well_known_server = yield self._get_well_known(parsed_uri.host)
+
+ if well_known_server:
+ # if we found a .well-known, start again, but don't do another
+ # .well-known lookup.
+
+ # parse the server name in the .well-known response into host/port.
+ # (This code is lifted from twisted.web.client.URI.fromBytes).
+ if b':' in well_known_server:
+ well_known_host, well_known_port = well_known_server.rsplit(b':', 1)
+ try:
+ well_known_port = int(well_known_port)
+ except ValueError:
+ # the part after the colon could not be parsed as an int
+ # - we assume it is an IPv6 literal with no port (the closing
+ # ']' stops it being parsed as an int)
+ well_known_host, well_known_port = well_known_server, -1
+ else:
+ well_known_host, well_known_port = well_known_server, -1
+
+ new_uri = URI(
+ scheme=parsed_uri.scheme,
+ netloc=well_known_server,
+ host=well_known_host,
+ port=well_known_port,
+ path=parsed_uri.path,
+ params=parsed_uri.params,
+ query=parsed_uri.query,
+ fragment=parsed_uri.fragment,
+ )
+
+ res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
+ defer.returnValue(res)
+
if not server_list:
target_host = parsed_uri.host
port = 8448
@@ -190,6 +249,53 @@ class MatrixFederationAgent(object):
target_port=port,
))
+ @defer.inlineCallbacks
+ def _get_well_known(self, server_name):
+ """Attempt to fetch and parse a .well-known file for the given server
+
+ Args:
+ server_name (bytes): name of the server, from the requested url
+
+ Returns:
+ Deferred[bytes|None]: either the new server name, from the .well-known, or
+ None if there was no .well-known file.
+ """
+ # FIXME: add a cache
+
+ uri = b"https://%s/.well-known/matrix/server" % (server_name, )
+ logger.info("Fetching %s", uri.decode("ascii"))
+ try:
+ response = yield make_deferred_yieldable(
+ self._well_known_agent.request(b"GET", uri),
+ )
+ except Exception as e:
+ logger.info(
+ "Connection error fetching %s: %s",
+ uri.decode("ascii"), e,
+ )
+ defer.returnValue(None)
+
+ body = yield make_deferred_yieldable(readBody(response))
+
+ if response.code != 200:
+ logger.info(
+ "Error response %i from %s: %s",
+ response.code, uri.decode("ascii"), body,
+ )
+ defer.returnValue(None)
+
+ content_types = response.headers.getRawHeaders(u'content-type')
+ if content_types is None:
+ raise Exception("no content-type header on .well-known response")
+ content_type, _opts = cgi.parse_header(content_types[-1])
+ if content_type != 'application/json':
+ raise Exception("content-type not application/json on .well-known response")
+ parsed_body = json.loads(body.decode('utf-8'))
+ logger.info("Response from .well-known: %s", parsed_body)
+ if not isinstance(parsed_body, dict) or "m.server" not in parsed_body:
+ raise Exception("invalid .well-known response")
+ defer.returnValue(parsed_body["m.server"].encode("ascii"))
+
@attr.s
class _RoutingResult(object):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 430bb15f51..f0e4a0e10c 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -201,7 +201,7 @@ class Linearizer(object):
if entry[0] >= self.max_count:
res = self._await_lock(key)
else:
- logger.info(
+ logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
@@ -215,7 +215,7 @@ class Linearizer(object):
try:
yield
finally:
- logger.info("Releasing linearizer lock %r for key %r", self.name, key)
+ logger.debug("Releasing linearizer lock %r for key %r", self.name, key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
@@ -247,7 +247,7 @@ class Linearizer(object):
"""
entry = self.key_to_defer[key]
- logger.info(
+ logger.debug(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
@@ -255,7 +255,7 @@ class Linearizer(object):
entry[1][new_defer] = 1
def cb(_r):
- logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
@@ -273,7 +273,7 @@ class Linearizer(object):
def eb(e):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
- logger.info(
+ logger.debug(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
|