diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f5e346cdbc..c6a8c1249a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,6 +17,7 @@
from twisted.internet import defer
from .federation_base import FederationBase
+from synapse.api.constants import Membership
from .units import Edu
from synapse.api.errors import (
@@ -356,19 +357,55 @@ class FederationClient(FederationBase):
defer.returnValue(signed_auth)
@defer.inlineCallbacks
- def make_join(self, destinations, room_id, user_id):
+ def make_membership_event(self, destinations, room_id, user_id, membership,
+ content={},):
+ """
+ Creates an m.room.member event, with context, without participating in the room.
+
+ Does so by asking one of the already participating servers to create an
+ event with proper context.
+
+ Note that this does not append any events to any graphs.
+
+ Args:
+ destinations (str): Candidate homeservers which are probably
+ participating in the room.
+ room_id (str): The room in which the event will happen.
+ user_id (str): The user whose membership is being evented.
+ membership (str): The "membership" property of the event. Must be
+ one of "join" or "leave".
+ content (object): Any additional data to put into the content field
+ of the event.
+ Return:
+ A tuple of (origin (str), event (object)) where origin is the remote
+ homeserver which generated the event.
+ """
+ valid_memberships = {Membership.JOIN, Membership.LEAVE}
+ if membership not in valid_memberships:
+ raise RuntimeError(
+ "make_membership_event called with membership='%s', must be one of %s" %
+ (membership, ",".join(valid_memberships))
+ )
for destination in destinations:
if destination == self.server_name:
continue
try:
- ret = yield self.transport_layer.make_join(
- destination, room_id, user_id
+ ret = yield self.transport_layer.make_membership_event(
+ destination, room_id, user_id, membership
)
pdu_dict = ret["event"]
- logger.debug("Got response to make_join: %s", pdu_dict)
+ logger.debug("Got response to make_%s: %s", membership, pdu_dict)
+
+ pdu_dict["content"].update(content)
+
+ # The protoevent received over the JSON wire may not have all
+ # the required fields. Lets just gloss over that because
+ # there's some we never care about
+ if "prev_state" not in pdu_dict:
+ pdu_dict["prev_state"] = []
defer.returnValue(
(destination, self.event_from_pdu_json(pdu_dict))
@@ -378,8 +415,8 @@ class FederationClient(FederationBase):
raise
except Exception as e:
logger.warn(
- "Failed to make_join via %s: %s",
- destination, e.message
+ "Failed to make_%s via %s: %s",
+ membership, destination, e.message
)
raise RuntimeError("Failed to send to any server.")
@@ -486,6 +523,33 @@ class FederationClient(FederationBase):
defer.returnValue(pdu)
@defer.inlineCallbacks
+ def send_leave(self, destinations, pdu):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_leave(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ logger.debug("Got content: %s", content)
+ defer.returnValue(None)
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_leave via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
+
+ @defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
"""
Params:
@@ -643,3 +707,26 @@ class FederationClient(FederationBase):
event.internal_metadata.outlier = outlier
return event
+
+ @defer.inlineCallbacks
+ def forward_third_party_invite(self, destinations, room_id, event_dict):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ yield self.transport_layer.exchange_third_party_invite(
+ destination=destination,
+ room_id=room_id,
+ event_dict=event_dict,
+ )
+ defer.returnValue(None)
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_third_party_invite via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 725c6f3fa5..7a59436a91 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -255,6 +255,20 @@ class FederationServer(FederationBase):
}))
@defer.inlineCallbacks
+ def on_make_leave_request(self, room_id, user_id):
+ pdu = yield self.handler.on_make_leave_request(room_id, user_id)
+ time_now = self._clock.time_msec()
+ defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+
+ @defer.inlineCallbacks
+ def on_send_leave_request(self, origin, content):
+ logger.debug("on_send_leave_request: content: %s", content)
+ pdu = self.event_from_pdu_json(content)
+ logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
+ yield self.handler.on_send_leave_request(origin, pdu)
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id):
time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id)
@@ -529,3 +543,15 @@ class FederationServer(FederationBase):
event.internal_metadata.outlier = outlier
return event
+
+ @defer.inlineCallbacks
+ def exchange_third_party_invite(self, invite):
+ ret = yield self.handler.exchange_third_party_invite(invite)
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ ret = yield self.handler.on_exchange_third_party_invite_request(
+ origin, room_id, event_dict
+ )
+ defer.returnValue(ret)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 32fa5e8c15..aac6f1c167 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -202,6 +202,7 @@ class TransactionQueue(object):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
+ # list of (pending_pdu, deferred, order)
if destination in self.pending_transactions:
# XXX: pending_transactions can get stuck on by a never-ending
# request at which point pending_pdus_by_dest just keeps growing.
@@ -213,9 +214,6 @@ class TransactionQueue(object):
)
return
- logger.debug("TX [%s] _attempt_new_transaction", destination)
-
- # list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -228,20 +226,22 @@ class TransactionQueue(object):
logger.debug("TX [%s] Nothing to send", destination)
return
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[2])
-
- pdus = [x[0] for x in pending_pdus]
- edus = [x[0] for x in pending_edus]
- failures = [x[0].get_dict() for x in pending_failures]
- deferreds = [
- x[1]
- for x in pending_pdus + pending_edus + pending_failures
- ]
-
try:
self.pending_transactions[destination] = 1
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[2])
+
+ pdus = [x[0] for x in pending_pdus]
+ edus = [x[0] for x in pending_edus]
+ failures = [x[0].get_dict() for x in pending_failures]
+ deferreds = [
+ x[1]
+ for x in pending_pdus + pending_edus + pending_failures
+ ]
+
txn_id = str(self._next_txn_id)
limiter = yield get_retry_limiter(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ced703364b..3d59e1c650 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -14,6 +14,7 @@
# limitations under the License.
from twisted.internet import defer
+from synapse.api.constants import Membership
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
@@ -160,13 +161,19 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
- path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
+ def make_membership_event(self, destination, room_id, user_id, membership):
+ valid_memberships = {Membership.JOIN, Membership.LEAVE}
+ if membership not in valid_memberships:
+ raise RuntimeError(
+ "make_membership_event called with membership='%s', must be one of %s" %
+ (membership, ",".join(valid_memberships))
+ )
+ path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
content = yield self.client.get_json(
destination=destination,
path=path,
- retry_on_dns_fail=retry_on_dns_fail,
+ retry_on_dns_fail=True,
)
defer.returnValue(content)
@@ -186,6 +193,19 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def send_leave(self, destination, room_id, event_id, content):
+ path = PREFIX + "/send_leave/%s/%s" % (room_id, event_id)
+
+ response = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=content,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
@@ -199,6 +219,19 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
+ def exchange_third_party_invite(self, destination, room_id, event_dict):
+ path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
+
+ response = yield self.client.put_json(
+ destination=destination,
+ path=path,
+ data=event_dict,
+ )
+
+ defer.returnValue(response)
+
+ @defer.inlineCallbacks
+ @log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 36f250e1a3..127b4da4f8 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -296,6 +296,24 @@ class FederationMakeJoinServlet(BaseFederationServlet):
defer.returnValue((200, content))
+class FederationMakeLeaveServlet(BaseFederationServlet):
+ PATH = "/make_leave/([^/]*)/([^/]*)"
+
+ @defer.inlineCallbacks
+ def on_GET(self, origin, content, query, context, user_id):
+ content = yield self.handler.on_make_leave_request(context, user_id)
+ defer.returnValue((200, content))
+
+
+class FederationSendLeaveServlet(BaseFederationServlet):
+ PATH = "/send_leave/([^/]*)/([^/]*)"
+
+ @defer.inlineCallbacks
+ def on_PUT(self, origin, content, query, room_id, txid):
+ content = yield self.handler.on_send_leave_request(origin, content)
+ defer.returnValue((200, content))
+
+
class FederationEventAuthServlet(BaseFederationServlet):
PATH = "/event_auth/([^/]*)/([^/]*)"
@@ -325,6 +343,17 @@ class FederationInviteServlet(BaseFederationServlet):
defer.returnValue((200, content))
+class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
+ PATH = "/exchange_third_party_invite/([^/]*)"
+
+ @defer.inlineCallbacks
+ def on_PUT(self, origin, content, query, room_id):
+ content = yield self.handler.on_exchange_third_party_invite_request(
+ origin, room_id, content
+ )
+ defer.returnValue((200, content))
+
+
class FederationClientKeysQueryServlet(BaseFederationServlet):
PATH = "/user/keys/query"
@@ -378,6 +407,30 @@ class FederationGetMissingEventsServlet(BaseFederationServlet):
defer.returnValue((200, content))
+class On3pidBindServlet(BaseFederationServlet):
+ PATH = "/3pid/onbind"
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ content_bytes = request.content.read()
+ content = json.loads(content_bytes)
+ if "invites" in content:
+ last_exception = None
+ for invite in content["invites"]:
+ try:
+ yield self.handler.exchange_third_party_invite(invite)
+ except Exception as e:
+ last_exception = e
+ if last_exception:
+ raise last_exception
+ defer.returnValue((200, {}))
+
+ # Avoid doing remote HS authorization checks which are done by default by
+ # BaseFederationServlet.
+ def _wrap(self, code):
+ return code
+
+
SERVLET_CLASSES = (
FederationPullServlet,
FederationEventServlet,
@@ -385,12 +438,16 @@ SERVLET_CLASSES = (
FederationBackfillServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
+ FederationMakeLeaveServlet,
FederationEventServlet,
FederationSendJoinServlet,
+ FederationSendLeaveServlet,
FederationInviteServlet,
FederationQueryAuthServlet,
FederationGetMissingEventsServlet,
FederationEventAuthServlet,
FederationClientKeysQueryServlet,
FederationClientKeysClaimServlet,
+ FederationThirdPartyInviteExchangeServlet,
+ On3pidBindServlet,
)
|