diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 330 |
1 files changed, 191 insertions, 139 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 925eb5376e..e23c5c2195 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,12 +17,11 @@ from ._base import BaseHandler -from synapse.api.events.utils import prune_event +from synapse.events.utils import prune_event from synapse.api.errors import ( AuthError, FederationError, SynapseError, StoreError, ) -from synapse.api.events.room import RoomMemberEvent, RoomCreateEvent -from synapse.api.constants import Membership +from synapse.api.constants import EventTypes, Membership from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( @@ -76,7 +75,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def handle_new_event(self, event, snapshot): + def handle_new_event(self, event, snapshot, destinations): """ Takes in an event from the client to server side, that has already been authed and handled by the state module, and sends it to any remote home servers that may be interested. @@ -92,16 +91,12 @@ class FederationHandler(BaseHandler): yield run_on_reactor() - pdu = event - - if not hasattr(pdu, "destinations") or not pdu.destinations: - pdu.destinations = [] - - yield self.replication_layer.send_pdu(pdu) + self.replication_layer.send_pdu(event, destinations) @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None, + auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -140,7 +135,7 @@ class FederationHandler(BaseHandler): if not check_event_content_hash(event): logger.warn( "Event content has been tampered, redacting %s, %s", - event.event_id, encode_canonical_json(event.get_full_dict()) + event.event_id, encode_canonical_json(event.get_dict()) ) event = redacted_event @@ -153,43 +148,44 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.outlier: + if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") - replication_layer = self.replication_layer - auth_chain = yield replication_layer.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) + replication = self.replication_layer + + if not state: + state, auth_chain = yield replication.get_state_for_context( + origin, context=event.room_id, event_id=event.event_id, + ) + + if not auth_chain: + auth_chain = yield replication.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, + ) for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e, fetch_auth_from=origin) except: logger.exception( - "Failed to parse auth event %s", + "Failed to handle auth event %s", e.event_id, ) - if not state: - state = yield replication_layer.get_state_for_context( - origin, - context=event.room_id, - event_id=event.event_id, - ) - current_state = state if state: for e in state: - e.outlier = True + logging.info("A :) %r", e) + e.internal_metadata.outlier = True try: yield self._handle_new_event(e) except: logger.exception( - "Failed to parse state event %s", + "Failed to handle state event %s", e.event_id, ) @@ -208,6 +204,13 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) + # if we're receiving valid events from an origin, + # it's probably a good idea to mark it as not in retry-state + # for sending (although this is a bit of a leap) + retry_timings = yield self.store.get_destination_retry_timings(origin) + if (retry_timings and retry_timings.retry_last_ts): + self.store.set_destination_retry_timings(origin, 0, 0) + room = yield self.store.get_room(event.room_id) if not room: @@ -222,7 +225,7 @@ class FederationHandler(BaseHandler): if not backfilled: extra_users = [] - if event.type == RoomMemberEvent.TYPE: + if event.type == EventTypes.Member: target_user_id = event.state_key target_user = self.hs.parse_userid(target_user_id) extra_users.append(target_user) @@ -231,7 +234,7 @@ class FederationHandler(BaseHandler): event, extra_users=extra_users ) - if event.type == RoomMemberEvent.TYPE: + if event.type == EventTypes.Member: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) yield self.distributor.fire( @@ -258,11 +261,15 @@ class FederationHandler(BaseHandler): event = pdu # FIXME (erikj): Not sure this actually works :/ - yield self.state_handler.annotate_event_with_state(event) + context = yield self.state_handler.compute_event_context(event) - events.append(event) + events.append((event, context)) - yield self.store.persist_event(event, backfilled=True) + yield self.store.persist_event( + event, + context=context, + backfilled=True + ) defer.returnValue(events) @@ -279,13 +286,11 @@ class FederationHandler(BaseHandler): pdu=event ) - - defer.returnValue(pdu) @defer.inlineCallbacks def on_event_auth(self, event_id): - auth = yield self.store.get_auth_chain(event_id) + auth = yield self.store.get_auth_chain([event_id]) for event in auth: event.signatures.update( @@ -325,42 +330,55 @@ class FederationHandler(BaseHandler): event = pdu # We should assert some things. - assert(event.type == RoomMemberEvent.TYPE) + # FIXME: Do this in a nicer way + assert(event.type == EventTypes.Member) assert(event.user_id == joinee) assert(event.state_key == joinee) assert(event.room_id == room_id) - event.outlier = False + event.internal_metadata.outlier = False self.room_queues[room_id] = [] + builder = self.event_builder_factory.new( + event.get_pdu_json() + ) + + handled_events = set() + try: - event.event_id = self.event_factory.create_event_id() - event.origin = self.hs.hostname - event.content = content + builder.event_id = self.event_builder_factory.create_event_id() + builder.origin = self.hs.hostname + builder.content = content if not hasattr(event, "signatures"): - event.signatures = {} + builder.signatures = {} add_hashes_and_signatures( - event, + builder, self.hs.hostname, self.hs.config.signing_key[0], ) + new_event = builder.build() + ret = yield self.replication_layer.send_join( target_host, - event + new_event ) state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) + handled_events.update([s.event_id for s in state]) + handled_events.update([a.event_id for a in auth_chain]) + handled_events.add(new_event.event_id) + logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) - logger.debug("do_invite_join event: %s", event) + logger.debug("do_invite_join event: %s", new_event) try: yield self.store.store_room( @@ -373,37 +391,36 @@ class FederationHandler(BaseHandler): pass for e in auth_chain: - e.outlier = True + e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e) except: logger.exception( - "Failed to parse auth event %s", + "Failed to handle auth event %s", e.event_id, ) for e in state: # FIXME: Auth these. - e.outlier = True + e.internal_metadata.outlier = True try: yield self._handle_new_event( - e, - fetch_missing=True + e, fetch_auth_from=target_host ) except: logger.exception( - "Failed to parse state event %s", + "Failed to handle state event %s", e.event_id, ) yield self._handle_new_event( - event, + new_event, state=state, current_state=state, ) yield self.notifier.on_new_room_event( - event, extra_users=[joinee] + new_event, extra_users=[joinee] ) logger.debug("Finished joining %s to %s", joinee, room_id) @@ -412,6 +429,9 @@ class FederationHandler(BaseHandler): del self.room_queues[room_id] for p, origin in room_queue: + if p.event_id in handled_events: + continue + try: self.on_receive_pdu(origin, p, backfilled=False) except: @@ -421,25 +441,24 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_make_join_request(self, context, user_id): + def on_make_join_request(self, room_id, user_id): """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We don *not* persist or process it until the other server has signed it and sent it back. """ - event = self.event_factory.create_event( - etype=RoomMemberEvent.TYPE, - content={"membership": Membership.JOIN}, - room_id=context, - user_id=user_id, - state_key=user_id, - ) + builder = self.event_builder_factory.new({ + "type": EventTypes.Member, + "content": {"membership": Membership.JOIN}, + "room_id": room_id, + "sender": user_id, + "state_key": user_id, + }) - snapshot = yield self.store.snapshot_room(event) - snapshot.fill_out_prev_events(event) + event, context = yield self._create_new_client_event( + builder=builder, + ) - yield self.state_handler.annotate_event_with_state(event) - yield self.auth.add_auth_events(event) - self.auth.check(event, auth_events=event.old_state_events) + self.auth.check(event, auth_events=context.auth_events) pdu = event @@ -453,12 +472,24 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = False + logger.debug( + "on_send_join_request: Got event: %s, signatures: %s", + event.event_id, + event.signatures, + ) + + event.internal_metadata.outlier = False - yield self._handle_new_event(event) + context = yield self._handle_new_event(event) + + logger.debug( + "on_send_join_request: After _handle_new_event: %s, sigs: %s", + event.event_id, + event.signatures, + ) extra_users = [] - if event.type == RoomMemberEvent.TYPE: + if event.type == EventTypes.Member: target_user_id = event.state_key target_user = self.hs.parse_userid(target_user_id) extra_users.append(target_user) @@ -467,7 +498,7 @@ class FederationHandler(BaseHandler): event, extra_users=extra_users ) - if event.type == RoomMemberEvent.TYPE: + if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = self.hs.parse_userid(event.state_key) yield self.distributor.fire( @@ -478,9 +509,9 @@ class FederationHandler(BaseHandler): destinations = set() - for k, s in event.state_events.items(): + for k, s in context.current_state.items(): try: - if k[0] == RoomMemberEvent.TYPE: + if k[0] == EventTypes.Member: if s.content["membership"] == Membership.JOIN: destinations.add( self.hs.parse_userid(s.state_key).domain @@ -490,14 +521,21 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) - new_pdu.destinations = list(destinations) + logger.debug( + "on_send_join_request: Sending event: %s, signatures: %s", + event.event_id, + event.signatures, + ) - yield self.replication_layer.send_pdu(new_pdu) + self.replication_layer.send_pdu(new_pdu, destinations) - auth_chain = yield self.store.get_auth_chain(event.event_id) + state_ids = [e.event_id for e in context.current_state.values()] + auth_chain = yield self.store.get_auth_chain(set( + [event.event_id] + state_ids + )) defer.returnValue({ - "state": event.state_events.values(), + "state": context.current_state.values(), "auth_chain": auth_chain, }) @@ -509,7 +547,7 @@ class FederationHandler(BaseHandler): """ event = pdu - event.outlier = True + event.internal_metadata.outlier = True event.signatures.update( compute_event_signature( @@ -519,10 +557,11 @@ class FederationHandler(BaseHandler): ) ) - yield self.state_handler.annotate_event_with_state(event) + context = yield self.state_handler.compute_event_context(event) yield self.store.persist_event( event, + context=context, backfilled=False, ) @@ -552,13 +591,13 @@ class FederationHandler(BaseHandler): } event = yield self.store.get_event(event_id) - if hasattr(event, "state_key"): + if event and event.is_state(): # Get previous state - if hasattr(event, "replaces_state") and event.replaces_state: - prev_event = yield self.store.get_event( - event.replaces_state - ) - results[(event.type, event.state_key)] = prev_event + if "replaces_state" in event.unsigned: + prev_id = event.unsigned["replaces_state"] + if prev_id != event.event_id: + prev_event = yield self.store.get_event(prev_id) + results[(event.type, event.state_key)] = prev_event else: del results[(event.type, event.state_key)] @@ -643,75 +682,88 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_missing=True): - is_new_state = yield self.state_handler.annotate_event_with_state( - event, - old_state=state + current_state=None, fetch_auth_from=None): + + logger.debug( + "_handle_new_event: Before annotate: %s, sigs: %s", + event.event_id, event.signatures, ) - if event.old_state_events: - known_ids = set( - [s.event_id for s in event.old_state_events.values()] - ) - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event( - e_id, - allow_none=True, - ) + context = yield self.state_handler.compute_event_context( + event, old_state=state + ) - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in %s", - event.event_id, e_id, known_ids, - ) - raise AuthError(403, "Auth events are stale") + logger.debug( + "_handle_new_event: Before auth fetch: %s, sigs: %s", + event.event_id, event.signatures, + ) - auth_events = event.old_state_events - else: - # We need to get the auth events from somewhere. + is_new_state = not event.internal_metadata.is_outlier() + + known_ids = set( + [s.event_id for s in context.auth_events.values()] + ) - # TODO: Don't just hit the DBs? + for e_id, _ in event.auth_events: + if e_id not in known_ids: + e = yield self.store.get_event(e_id, allow_none=True) - auth_events = {} - for e_id, _ in event.auth_events: - e = yield self.store.get_event( - e_id, - allow_none=True, - ) + if not e and fetch_auth_from is not None: + # Grab the auth_chain over federation if we are missing + # auth events. + auth_chain = yield self.replication_layer.get_event_auth( + fetch_auth_from, event.event_id, event.room_id + ) + for auth_event in auth_chain: + yield self._handle_new_event(auth_event) + e = yield self.store.get_event(e_id, allow_none=True) if not e: - e = yield self.replication_layer.get_pdu( - event.origin, e_id, outlier=True + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in db or %s", + event.event_id, e_id, known_ids, ) + # FIXME: How does raising AuthError work with federation? + raise AuthError(403, "Cannot find auth event") - if e and fetch_missing: - try: - yield self.on_receive_pdu(event.origin, e, False) - except: - logger.exception( - "Failed to parse auth event %s", - e_id, - ) + context.auth_events[(e.type, e.state_key)] = e - if not e: - logger.warn("Can't find auth event %s.", e_id) + logger.debug( + "_handle_new_event: Before hack: %s, sigs: %s", + event.event_id, event.signatures, + ) + + if event.type == EventTypes.Member and not event.auth_events: + if len(event.prev_events) == 1: + c = yield self.store.get_event(event.prev_events[0][0]) + if c.type == EventTypes.Create: + context.auth_events[(c.type, c.state_key)] = c - auth_events[(e.type, e.state_key)] = e + logger.debug( + "_handle_new_event: Before auth check: %s, sigs: %s", + event.event_id, event.signatures, + ) - if event.type == RoomMemberEvent.TYPE and not event.auth_events: - if len(event.prev_events) == 1: - c = yield self.store.get_event(event.prev_events[0][0]) - if c.type == RoomCreateEvent.TYPE: - auth_events[(c.type, c.state_key)] = c + self.auth.check(event, auth_events=context.auth_events) - self.auth.check(event, auth_events=auth_events) + logger.debug( + "_handle_new_event: Before persist_event: %s, sigs: %s", + event.event_id, event.signatures, + ) yield self.store.persist_event( event, + context=context, backfilled=backfilled, is_new_state=(is_new_state and not backfilled), current_state=current_state, ) + + logger.debug( + "_handle_new_event: After persist_event: %s, sigs: %s", + event.event_id, event.signatures, + ) + + defer.returnValue(context) |