diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4aec3563ac..195f7c618a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
+# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -91,11 +91,12 @@ class FederationHandler(BaseHandler):
yield run_on_reactor()
- yield self.replication_layer.send_pdu(event, destinations)
+ self.replication_layer.send_pdu(event, destinations)
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, backfilled, state=None):
+ def on_receive_pdu(self, origin, pdu, backfilled, state=None,
+ auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
@@ -150,40 +151,41 @@ class FederationHandler(BaseHandler):
if not is_in_room and not event.internal_metadata.outlier:
logger.debug("Got event for room we're not in.")
- replication_layer = self.replication_layer
- auth_chain = yield replication_layer.get_event_auth(
- origin,
- context=event.room_id,
- event_id=event.event_id,
- )
+ replication = self.replication_layer
+
+ if not state:
+ state, auth_chain = yield replication.get_state_for_context(
+ origin, context=event.room_id, event_id=event.event_id,
+ )
+
+ if not auth_chain:
+ auth_chain = yield replication.get_event_auth(
+ origin,
+ context=event.room_id,
+ event_id=event.event_id,
+ )
for e in auth_chain:
e.internal_metadata.outlier = True
try:
- yield self._handle_new_event(e, fetch_missing=False)
+ yield self._handle_new_event(e, fetch_auth_from=origin)
except:
logger.exception(
- "Failed to parse auth event %s",
+ "Failed to handle auth event %s",
e.event_id,
)
- if not state:
- state = yield replication_layer.get_state_for_context(
- origin,
- context=event.room_id,
- event_id=event.event_id,
- )
-
current_state = state
if state:
for e in state:
+ logging.info("A :) %r", e)
e.internal_metadata.outlier = True
try:
yield self._handle_new_event(e)
except:
logger.exception(
- "Failed to parse state event %s",
+ "Failed to handle state event %s",
e.event_id,
)
@@ -288,7 +290,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_event_auth(self, event_id):
- auth = yield self.store.get_auth_chain(event_id)
+ auth = yield self.store.get_auth_chain([event_id])
for event in auth:
event.signatures.update(
@@ -391,10 +393,10 @@ class FederationHandler(BaseHandler):
for e in auth_chain:
e.internal_metadata.outlier = True
try:
- yield self._handle_new_event(e, fetch_missing=False)
+ yield self._handle_new_event(e)
except:
logger.exception(
- "Failed to parse auth event %s",
+ "Failed to handle auth event %s",
e.event_id,
)
@@ -403,12 +405,11 @@ class FederationHandler(BaseHandler):
e.internal_metadata.outlier = True
try:
yield self._handle_new_event(
- e,
- fetch_missing=True
+ e, fetch_auth_from=target_host
)
except:
logger.exception(
- "Failed to parse state event %s",
+ "Failed to handle state event %s",
e.event_id,
)
@@ -526,9 +527,12 @@ class FederationHandler(BaseHandler):
event.signatures,
)
- yield self.replication_layer.send_pdu(new_pdu, destinations)
+ self.replication_layer.send_pdu(new_pdu, destinations)
- auth_chain = yield self.store.get_auth_chain(event.event_id)
+ state_ids = [e.event_id for e in context.current_state.values()]
+ auth_chain = yield self.store.get_auth_chain(set(
+ [event.event_id] + state_ids
+ ))
defer.returnValue({
"state": context.current_state.values(),
@@ -613,13 +617,13 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_backfill_request(self, origin, context, pdu_list, limit):
- in_room = yield self.auth.check_host_in_room(context, origin)
+ def on_backfill_request(self, origin, room_id, pdu_list, limit):
+ in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
events = yield self.store.get_backfill_events(
- context,
+ room_id,
pdu_list,
limit
)
@@ -678,7 +682,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _handle_new_event(self, event, state=None, backfilled=False,
- current_state=None, fetch_missing=True):
+ current_state=None, fetch_auth_from=None):
logger.debug(
"_handle_new_event: Before annotate: %s, sigs: %s",
@@ -699,11 +703,20 @@ class FederationHandler(BaseHandler):
known_ids = set(
[s.event_id for s in context.auth_events.values()]
)
+
for e_id, _ in event.auth_events:
if e_id not in known_ids:
- e = yield self.store.get_event(
- e_id, allow_none=True,
- )
+ e = yield self.store.get_event(e_id, allow_none=True)
+
+ if not e and fetch_auth_from is not None:
+ # Grab the auth_chain over federation if we are missing
+ # auth events.
+ auth_chain = yield self.replication_layer.get_event_auth(
+ fetch_auth_from, event.event_id, event.room_id
+ )
+ for auth_event in auth_chain:
+ yield self._handle_new_event(auth_event)
+ e = yield self.store.get_event(e_id, allow_none=True)
if not e:
# TODO: Do some conflict res to make sure that we're
@@ -713,7 +726,7 @@ class FederationHandler(BaseHandler):
event.event_id, e_id, known_ids,
)
# FIXME: How does raising AuthError work with federation?
- raise AuthError(403, "Auth events are stale")
+ raise AuthError(403, "Cannot find auth event")
context.auth_events[(e.type, e.state_key)] = e
|