diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 723f571284..d4f586fae7 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -26,7 +26,6 @@ from synapse.api.errors import (
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
-from synapse.util import third_party_invites
from synapse.events import FrozenEvent
import synapse.metrics
@@ -358,7 +357,8 @@ class FederationClient(FederationBase):
defer.returnValue(signed_auth)
@defer.inlineCallbacks
- def make_membership_event(self, destinations, room_id, user_id, membership, content):
+ def make_membership_event(self, destinations, room_id, user_id, membership,
+ content={},):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -390,20 +390,17 @@ class FederationClient(FederationBase):
if destination == self.server_name:
continue
- args = {}
- if third_party_invites.join_has_third_party_invite(content):
- args = third_party_invites.extract_join_keys(
- content["third_party_invite"]
- )
try:
ret = yield self.transport_layer.make_membership_event(
- destination, room_id, user_id, membership, args
+ destination, room_id, user_id, membership
)
pdu_dict = ret["event"]
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
+ pdu_dict["content"].update(content)
+
defer.returnValue(
(destination, self.event_from_pdu_json(pdu_dict))
)
@@ -704,3 +701,26 @@ class FederationClient(FederationBase):
event.internal_metadata.outlier = outlier
return event
+
+ @defer.inlineCallbacks
+ def forward_third_party_invite(self, destinations, room_id, event_dict):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ yield self.transport_layer.exchange_third_party_invite(
+ destination=destination,
+ room_id=room_id,
+ event_dict=event_dict,
+ )
+ defer.returnValue(None)
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_third_party_invite via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9e2d9ee74c..7a59436a91 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -23,12 +23,10 @@ from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
import synapse.metrics
-from synapse.api.errors import FederationError, SynapseError, Codes
+from synapse.api.errors import FederationError, SynapseError
from synapse.crypto.event_signing import compute_event_signature
-from synapse.util import third_party_invites
-
import simplejson as json
import logging
@@ -230,19 +228,8 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
- def on_make_join_request(self, room_id, user_id, query):
- threepid_details = {}
- if third_party_invites.has_join_keys(query):
- for k in third_party_invites.JOIN_KEYS:
- if not isinstance(query[k], list) or len(query[k]) != 1:
- raise FederationError(
- "FATAL",
- Codes.MISSING_PARAM,
- "key %s value %s" % (k, query[k],),
- None
- )
- threepid_details[k] = query[k][0]
- pdu = yield self.handler.on_make_join_request(room_id, user_id, threepid_details)
+ def on_make_join_request(self, room_id, user_id):
+ pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
@@ -556,3 +543,15 @@ class FederationServer(FederationBase):
event.internal_metadata.outlier = outlier
return event
+
+ @defer.inlineCallbacks
+ def exchange_third_party_invite(self, invite):
+ ret = yield self.handler.exchange_third_party_invite(invite)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ ret = yield self.handler.on_exchange_third_party_invite_request(
+ origin, room_id, event_dict
+ )
+ defer.returnValue(ret)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 32fa5e8c15..aac6f1c167 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -202,6 +202,7 @@ class TransactionQueue(object):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
+ # list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
@@ -213,9 +214,6 @@ class TransactionQueue(object):
)
return
- logger.debug("TX [%s] _attempt_new_transaction", destination)
-
- # list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -228,20 +226,22 @@ class TransactionQueue(object):
logger.debug("TX [%s] Nothing to send", destination)
return
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[2])
-
- pdus = [x[0] for x in pending_pdus]
- edus = [x[0] for x in pending_edus]
- failures = [x[0].get_dict() for x in pending_failures]
- deferreds = [
- x[1]
- for x in pending_pdus + pending_edus + pending_failures
- ]
-
try:
self.pending_transactions[destination] = 1
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[2])
+
+ pdus = [x[0] for x in pending_pdus]
+ edus = [x[0] for x in pending_edus]
+ failures = [x[0].get_dict() for x in pending_failures]
+ deferreds = [
+ x[1]
+ for x in pending_pdus + pending_edus + pending_failures
+ ]
+
txn_id = str(self._next_txn_id)
limiter = yield get_retry_limiter(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index a81b3c4345..3d59e1c650 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -161,7 +161,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def make_membership_event(self, destination, room_id, user_id, membership, args={}):
+ def make_membership_event(self, destination, room_id, user_id, membership):
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
raise RuntimeError(
@@ -173,7 +173,6 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
- args=args,
retry_on_dns_fail=True,
)
@@ -220,6 +219,19 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def exchange_third_party_invite(self, destination, room_id, event_dict):
+ path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
+
+ response = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=event_dict,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8184159210..127b4da4f8 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -292,7 +292,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
- content = yield self.handler.on_make_join_request(context, user_id, query)
+ content = yield self.handler.on_make_join_request(context, user_id)
defer.returnValue((200, content))
@@ -343,6 +343,17 @@ class FederationInviteServlet(BaseFederationServlet):
defer.returnValue((200, content))
+class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
+ PATH = "/exchange_third_party_invite/([^/]*)"
+
+ @defer.inlineCallbacks
+ def on_PUT(self, origin, content, query, room_id):
+ content = yield self.handler.on_exchange_third_party_invite_request(
+ origin, room_id, content
+ )
+ defer.returnValue((200, content))
+
+
class FederationClientKeysQueryServlet(BaseFederationServlet):
PATH = "/user/keys/query"
@@ -396,6 +407,30 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
defer.returnValue((200, content))
+class On3pidBindServlet(BaseFederationServlet):
+ PATH = "/3pid/onbind"
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ content_bytes = request.content.read()
+ content = json.loads(content_bytes)
+ if "invites" in content:
+ last_exception = None
+ for invite in content["invites"]:
+ try:
+ yield self.handler.exchange_third_party_invite(invite)
+ except Exception as e:
+ last_exception = e
+ if last_exception:
+ raise last_exception
+ defer.returnValue((200, {}))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
SERVLET_CLASSES = (
FederationPullServlet,
FederationEventServlet,
@@ -413,4 +448,6 @@ SERVLET_CLASSES = (
FederationEventAuthServlet,
FederationClientKeysQueryServlet,
FederationClientKeysClaimServlet,
+ FederationThirdPartyInviteExchangeServlet,
+ On3pidBindServlet,
)
|