diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index fa2463d4a3..01f87fe423 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
@defer.inlineCallbacks
@log_function
+ def get_event_auth(self, destination, context, event_id):
+ res = yield self.transport_layer.get_event_auth(
+ destination, context, event_id,
+ )
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in res["auth_chain"]
+ ]
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue(auth_chain)
+
+ @defer.inlineCallbacks
+ @log_function
def on_backfill_request(self, origin, context, versions, limit):
pdus = yield self.handler.on_backfill_request(
origin, context, versions, limit
@@ -481,11 +497,17 @@ class ReplicationLayer(object):
# FIXME: We probably want to do something with the auth_chain given
# to us
- # auth_chain = [
- # Pdu(outlier=True, **p) for p in content.get("auth_chain", [])
- # ]
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
+
+ auth_chain.sort(key=lambda e: e.depth)
- defer.returnValue(state)
+ defer.returnValue({
+ "state": state,
+ "auth_chain": auth_chain,
+ })
@defer.inlineCallbacks
def send_invite(self, destination, context, event_id, pdu):
@@ -543,20 +565,34 @@ class ReplicationLayer(object):
state = None
# We need to make sure we have all the auth events.
- for e_id, _ in pdu.auth_events:
- exists = yield self._get_persisted_pdu(
- origin,
- e_id,
- do_auth=False
- )
-
- if not exists:
- yield self.get_pdu(
- origin,
- event_id=e_id,
- outlier=True,
- )
- logger.debug("Processed pdu %s", e_id)
+ # for e_id, _ in pdu.auth_events:
+ # exists = yield self._get_persisted_pdu(
+ # origin,
+ # e_id,
+ # do_auth=False
+ # )
+ #
+ # if not exists:
+ # try:
+ # logger.debug(
+ # "_handle_new_pdu fetch missing auth event %s from %s",
+ # e_id,
+ # origin,
+ # )
+ #
+ # yield self.get_pdu(
+ # origin,
+ # event_id=e_id,
+ # outlier=True,
+ # )
+ #
+ # logger.debug("Processed pdu %s", e_id)
+ # except:
+ # logger.warn(
+ # "Failed to get auth event %s from %s",
+ # e_id,
+ # origin
+ # )
# Get missing pdus if necessary.
if not pdu.outlier:
@@ -565,6 +601,11 @@ class ReplicationLayer(object):
pdu.room_id
)
+ logger.debug(
+ "_handle_new_pdu min_depth for %s: %d",
+ pdu.room_id, min_depth
+ )
+
if min_depth and pdu.depth > min_depth:
for event_id, hashes in pdu.prev_events:
exists = yield self._get_persisted_pdu(
@@ -574,11 +615,14 @@ class ReplicationLayer(object):
)
if not exists:
- logger.debug("Requesting pdu %s", event_id)
+ logger.debug(
+ "_handle_new_pdu requesting pdu %s",
+ event_id
+ )
try:
yield self.get_pdu(
- pdu.origin,
+ origin,
event_id=event_id,
)
logger.debug("Processed pdu %s", event_id)
@@ -588,12 +632,17 @@ class ReplicationLayer(object):
else:
# We need to get the state at this event, since we have reached
# a backward extremity edge.
+ logger.debug(
+ "_handle_new_pdu getting state for %s",
+ pdu.room_id
+ )
state = yield self.get_state_for_context(
origin, pdu.room_id, pdu.event_id,
)
if not backfilled:
ret = yield self.handler.on_receive_pdu(
+ origin,
pdu,
backfilled=backfilled,
state=state,
@@ -804,7 +853,10 @@ class _TransactionQueue(object):
# Ensures we don't continue until all callbacks on that
# deferred have fired
- yield deferred
+ try:
+ yield deferred
+ except:
+ pass
logger.debug("TX [%s] Yielded to callbacks", destination)
@@ -816,7 +868,8 @@ class _TransactionQueue(object):
logger.exception(e)
for deferred in deferreds:
- deferred.errback(e)
+ if not deferred.called:
+ deferred.errback(e)
finally:
# We want to be *very* sure we delete this after we stop processing
|