diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 946ff97c7d..ae9d227586 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -565,7 +565,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
+ def do_invite_join(self, target_hosts, room_id, joinee, content):
""" Attempts to join the `joinee` to the room `room_id` via the
server `target_host`.
@@ -581,50 +581,19 @@ class FederationHandler(BaseHandler):
yield self.store.clean_room_for_join(room_id)
- origin, pdu = yield self.replication_layer.make_join(
+ origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
joinee,
+ "join",
content
)
- logger.debug("Got response to make_join: %s", pdu)
-
- event = pdu
-
- # We should assert some things.
- # FIXME: Do this in a nicer way
- assert(event.type == EventTypes.Member)
- assert(event.user_id == joinee)
- assert(event.state_key == joinee)
- assert(event.room_id == room_id)
-
- event.internal_metadata.outlier = False
-
self.room_queues[room_id] = []
-
- builder = self.event_builder_factory.new(
- unfreeze(event.get_pdu_json())
- )
-
handled_events = set()
try:
- builder.event_id = self.event_builder_factory.create_event_id()
- builder.origin = self.hs.hostname
- builder.content = content
-
- if not hasattr(event, "signatures"):
- builder.signatures = {}
-
- add_hashes_and_signatures(
- builder,
- self.hs.hostname,
- self.hs.config.signing_key[0],
- )
-
- new_event = builder.build()
-
+ new_event = self._sign_event(event)
# Try the host we successfully got a response to /make_join/
# request first.
try:
@@ -632,11 +601,7 @@ class FederationHandler(BaseHandler):
target_hosts.insert(0, origin)
except ValueError:
pass
-
- ret = yield self.replication_layer.send_join(
- target_hosts,
- new_event
- )
+ ret = yield self.replication_layer.send_join(target_hosts, new_event)
origin = ret["origin"]
state = ret["state"]
@@ -700,7 +665,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_join_request(self, room_id, user_id, query):
""" We've received a /make_join/ request, so we create a partial
- join event for the room and return that. We don *not* persist or
+ join event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
event_content = {"membership": Membership.JOIN}
@@ -860,6 +825,168 @@ class FederationHandler(BaseHandler):
defer.returnValue(event)
@defer.inlineCallbacks
+ def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+ origin, event = yield self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ user_id,
+ "leave",
+ {}
+ )
+ signed_event = self._sign_event(event)
+
+ # Try the host we successfully got a response to /make_join/
+ # request first.
+ try:
+ target_hosts.remove(origin)
+ target_hosts.insert(0, origin)
+ except ValueError:
+ pass
+
+ yield self.replication_layer.send_leave(
+ target_hosts,
+ signed_event
+ )
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
+ def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, content):
+ origin, pdu = yield self.replication_layer.make_membership_event(
+ target_hosts,
+ room_id,
+ user_id,
+ membership,
+ content
+ )
+
+ logger.debug("Got response to make_%s: %s", membership, pdu)
+
+ event = pdu
+
+ # We should assert some things.
+ # FIXME: Do this in a nicer way
+ assert(event.type == EventTypes.Member)
+ assert(event.user_id == user_id)
+ assert(event.state_key == user_id)
+ assert(event.room_id == room_id)
+ defer.returnValue((origin, event))
+
+ def _sign_event(self, event):
+ event.internal_metadata.outlier = False
+
+ builder = self.event_builder_factory.new(
+ unfreeze(event.get_pdu_json())
+ )
+
+ builder.event_id = self.event_builder_factory.create_event_id()
+ builder.origin = self.hs.hostname
+
+ if not hasattr(event, "signatures"):
+ builder.signatures = {}
+
+ add_hashes_and_signatures(
+ builder,
+ self.hs.hostname,
+ self.hs.config.signing_key[0],
+ )
+
+ return builder.build()
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_make_leave_request(self, room_id, user_id):
+ """ We've received a /make_leave/ request, so we create a partial
+ join event for the room and return that. We do *not* persist or
+ process it until the other server has signed it and sent it back.
+ """
+ builder = self.event_builder_factory.new({
+ "type": EventTypes.Member,
+ "content": {"membership": Membership.LEAVE},
+ "room_id": room_id,
+ "sender": user_id,
+ "state_key": user_id,
+ })
+
+ event, context = yield self._create_new_client_event(
+ builder=builder,
+ )
+
+ self.auth.check(event, auth_events=context.current_state)
+
+ defer.returnValue(event)
+
+ @defer.inlineCallbacks
+ @log_function
+ def on_send_leave_request(self, origin, pdu):
+ """ We have received a leave event for a room. Fully process it."""
+ event = pdu
+
+ logger.debug(
+ "on_send_leave_request: Got event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ event.internal_metadata.outlier = False
+
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin, event
+ )
+
+ logger.debug(
+ "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
+
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
+
+ def log_failure(f):
+ logger.warn(
+ "Failed to notify about %s: %s",
+ event.event_id, f.value
+ )
+
+ d.addErrback(log_failure)
+
+ new_pdu = event
+
+ destinations = set()
+
+ for k, s in context.current_state.items():
+ try:
+ if k[0] == EventTypes.Member:
+ if s.content["membership"] == Membership.LEAVE:
+ destinations.add(
+ UserID.from_string(s.state_key).domain
+ )
+ except:
+ logger.warn(
+ "Failed to get destination from event %s", s.event_id
+ )
+
+ destinations.discard(origin)
+
+ logger.debug(
+ "on_send_leave_request: Sending event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ self.replication_layer.send_pdu(new_pdu, destinations)
+
+ defer.returnValue(None)
+
+ @defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
yield run_on_reactor()
|