diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7550e11b6e..c9f3c2d352 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,7 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
@@ -518,10 +518,10 @@ class FederationClient(FederationBase):
description, destination, exc_info=1,
)
- raise RuntimeError("Failed to %s via any server", description)
+ raise RuntimeError("Failed to %s via any server" % (description, ))
def make_membership_event(self, destinations, room_id, user_id, membership,
- content={},):
+ content, params):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -537,8 +537,10 @@ class FederationClient(FederationBase):
user_id (str): The user whose membership is being evented.
membership (str): The "membership" property of the event. Must be
one of "join" or "leave".
- content (object): Any additional data to put into the content field
+ content (dict): Any additional data to put into the content field
of the event.
+ params (dict[str, str|Iterable[str]]): Query parameters to include in the
+ request.
Return:
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
@@ -558,10 +560,12 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
- destination, room_id, user_id, membership
+ destination, room_id, user_id, membership, params,
)
- pdu_dict = ret["event"]
+ pdu_dict = ret.get("event", None)
+ if not isinstance(pdu_dict, dict):
+ raise InvalidResponseError("Bad 'event' field in response")
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
@@ -605,6 +609,26 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable.
"""
+ def check_authchain_validity(signed_auth_chain):
+ for e in signed_auth_chain:
+ if e.type == EventTypes.Create:
+ create_event = e
+ break
+ else:
+ raise InvalidResponseError(
+ "no %s in auth chain" % (EventTypes.Create,),
+ )
+
+ # the room version should be sane.
+ room_version = create_event.content.get("room_version", "1")
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ # This shouldn't be possible, because the remote server should have
+ # rejected the join attempt during make_join.
+ raise InvalidResponseError(
+ "room appears to have unsupported version %s" % (
+ room_version,
+ ))
+
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
@@ -661,7 +685,7 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
- auth_chain.sort(key=lambda e: e.depth)
+ check_authchain_validity(signed_auth)
defer.returnValue({
"state": signed_state,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 941e30a596..d23c1cf13b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -27,7 +27,13 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ FederationError,
+ IncompatibleRoomVersionError,
+ NotFoundError,
+ SynapseError,
+)
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
@@ -327,12 +333,21 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
- def on_make_join_request(self, origin, room_id, user_id):
+ def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
+
+ room_version = yield self.store.get_room_version(room_id)
+ if room_version not in supported_versions:
+ logger.warn("Room version %s not in %s", room_version, supported_versions)
+ raise IncompatibleRoomVersionError(room_version=room_version)
+
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
- defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+ defer.returnValue({
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ })
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 78f9d40a3a..f603c8a368 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
events_processed_counter,
sent_edus_counter,
sent_transactions_counter,
@@ -253,7 +255,13 @@ class TransactionQueue(object):
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
- events_processed_counter.inc(len(events))
+ events_processed_counter.inc(len(events))
+
+ event_processing_loop_room_count.labels(
+ "federation_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4529d454af..b4fbe2c9d5 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -195,7 +195,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def make_membership_event(self, destination, room_id, user_id, membership):
+ def make_membership_event(self, destination, room_id, user_id, membership, params):
"""Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs.
@@ -205,6 +205,8 @@ class TransportLayerClient(object):
room_id (str): room to join/leave
user_id (str): user to be joined/left
membership (str): one of join/leave
+ params (dict[str, str|Iterable[str]]): Query parameters to include in the
+ request.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
@@ -241,6 +243,7 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
+ args=params,
retry_on_dns_fail=retry_on_dns_fail,
timeout=20000,
ignore_backoff=ignore_backoff,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index eae5f2b427..77969a4f38 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
class BaseFederationServlet(object):
+ """Abstract base class for federation servlet classes.
+
+ The servlet object should have a PATH attribute which takes the form of a regexp to
+ match against the request path (excluding the /federation/v1 prefix).
+
+ The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
+ the appropriate HTTP method. These methods have the signature:
+
+ on_<METHOD>(self, origin, content, query, **kwargs)
+
+ With arguments:
+
+ origin (unicode|None): The authenticated server_name of the calling server,
+ unless REQUIRE_AUTH is set to False and authentication failed.
+
+ content (unicode|None): decoded json body of the request. None if the
+ request was a GET.
+
+ query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
+ (ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
+ yet.
+
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: either (response code, response object) to
+ return a JSON response, or None if the request has already been handled.
+
+ Raises:
+ SynapseError: to return an error code
+
+ Exception: other exceptions will be caught, logged, and a 500 will be
+ returned.
+ """
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
@@ -204,6 +239,18 @@ class BaseFederationServlet(object):
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
+ """ A callback which can be passed to HttpServer.RegisterPaths
+
+ Args:
+ request (twisted.web.http.Request):
+ *args: unused?
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: (response code, response object) as returned
+ by the callback method. None if the request has already been handled.
+ """
content = None
if request.method in ["PUT", "POST"]:
# TODO: Handle other method types? other content types?
@@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
- def on_GET(self, origin, content, query, context, user_id):
+ def on_GET(self, origin, _content, query, context, user_id):
+ """
+ Args:
+ origin (unicode): The authenticated server_name of the calling server
+
+ _content (None): (GETs don't have bodies)
+
+ query (dict[bytes, list[bytes]]): Query params from the request.
+
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: either (response code, response object) to
+ return a JSON response, or None if the request has already been handled.
+ """
+ versions = query.get(b'ver')
+ if versions is not None:
+ supported_versions = [v.decode("utf-8") for v in versions]
+ else:
+ supported_versions = ["1"]
+
content = yield self.handler.on_make_join_request(
origin, context, user_id,
+ supported_versions=supported_versions,
)
defer.returnValue((200, content))
|