diff options
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r-- | synapse/federation/replication.py | 97 |
1 files changed, 75 insertions, 22 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index fa2463d4a3..01f87fe423 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -283,6 +283,22 @@ class ReplicationLayer(object): @defer.inlineCallbacks @log_function + def get_event_auth(self, destination, context, event_id): + res = yield self.transport_layer.get_event_auth( + destination, context, event_id, + ) + + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in res["auth_chain"] + ] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue(auth_chain) + + @defer.inlineCallbacks + @log_function def on_backfill_request(self, origin, context, versions, limit): pdus = yield self.handler.on_backfill_request( origin, context, versions, limit @@ -481,11 +497,17 @@ class ReplicationLayer(object): # FIXME: We probably want to do something with the auth_chain given # to us - # auth_chain = [ - # Pdu(outlier=True, **p) for p in content.get("auth_chain", []) - # ] + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] + + auth_chain.sort(key=lambda e: e.depth) - defer.returnValue(state) + defer.returnValue({ + "state": state, + "auth_chain": auth_chain, + }) @defer.inlineCallbacks def send_invite(self, destination, context, event_id, pdu): @@ -543,20 +565,34 @@ class ReplicationLayer(object): state = None # We need to make sure we have all the auth events. - for e_id, _ in pdu.auth_events: - exists = yield self._get_persisted_pdu( - origin, - e_id, - do_auth=False - ) - - if not exists: - yield self.get_pdu( - origin, - event_id=e_id, - outlier=True, - ) - logger.debug("Processed pdu %s", e_id) + # for e_id, _ in pdu.auth_events: + # exists = yield self._get_persisted_pdu( + # origin, + # e_id, + # do_auth=False + # ) + # + # if not exists: + # try: + # logger.debug( + # "_handle_new_pdu fetch missing auth event %s from %s", + # e_id, + # origin, + # ) + # + # yield self.get_pdu( + # origin, + # event_id=e_id, + # outlier=True, + # ) + # + # logger.debug("Processed pdu %s", e_id) + # except: + # logger.warn( + # "Failed to get auth event %s from %s", + # e_id, + # origin + # ) # Get missing pdus if necessary. if not pdu.outlier: @@ -565,6 +601,11 @@ class ReplicationLayer(object): pdu.room_id ) + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + if min_depth and pdu.depth > min_depth: for event_id, hashes in pdu.prev_events: exists = yield self._get_persisted_pdu( @@ -574,11 +615,14 @@ class ReplicationLayer(object): ) if not exists: - logger.debug("Requesting pdu %s", event_id) + logger.debug( + "_handle_new_pdu requesting pdu %s", + event_id + ) try: yield self.get_pdu( - pdu.origin, + origin, event_id=event_id, ) logger.debug("Processed pdu %s", event_id) @@ -588,12 +632,17 @@ class ReplicationLayer(object): else: # We need to get the state at this event, since we have reached # a backward extremity edge. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) state = yield self.get_state_for_context( origin, pdu.room_id, pdu.event_id, ) if not backfilled: ret = yield self.handler.on_receive_pdu( + origin, pdu, backfilled=backfilled, state=state, @@ -804,7 +853,10 @@ class _TransactionQueue(object): # Ensures we don't continue until all callbacks on that # deferred have fired - yield deferred + try: + yield deferred + except: + pass logger.debug("TX [%s] Yielded to callbacks", destination) @@ -816,7 +868,8 @@ class _TransactionQueue(object): logger.exception(e) for deferred in deferreds: - deferred.errback(e) + if not deferred.called: + deferred.errback(e) finally: # We want to be *very* sure we delete this after we stop processing |