summary refs log tree commit diff
path: root/synapse/federation/replication.py
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2014-12-10 16:14:17 +0000
committerMark Haines <mark.haines@matrix.org>2014-12-10 16:14:17 +0000
commit61fc37e467bafe8b1178ec35daf0655049b3cc73 (patch)
treefd47305db5a3d6d8f08514d0c1620628b35495a2 /synapse/federation/replication.py
parentimport Image as PIL.Image. (diff)
parentpoint the entry_point for synapse-homeserver at the right method (diff)
downloadsynapse-61fc37e467bafe8b1178ec35daf0655049b3cc73.tar.xz
Merge branch 'develop' into media_repository
Diffstat (limited to 'synapse/federation/replication.py')
-rw-r--r--synapse/federation/replication.py97
1 files changed, 75 insertions, 22 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index fa2463d4a3..01f87fe423 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -283,6 +283,22 @@ class ReplicationLayer(object):
 
     @defer.inlineCallbacks
     @log_function
+    def get_event_auth(self, destination, context, event_id):
+        res = yield self.transport_layer.get_event_auth(
+            destination, context, event_id,
+        )
+
+        auth_chain = [
+            self.event_from_pdu_json(p, outlier=True)
+            for p in res["auth_chain"]
+        ]
+
+        auth_chain.sort(key=lambda e: e.depth)
+
+        defer.returnValue(auth_chain)
+
+    @defer.inlineCallbacks
+    @log_function
     def on_backfill_request(self, origin, context, versions, limit):
         pdus = yield self.handler.on_backfill_request(
             origin, context, versions, limit
@@ -481,11 +497,17 @@ class ReplicationLayer(object):
         # FIXME: We probably want to do something with the auth_chain given
         # to us
 
-        # auth_chain = [
-        #    Pdu(outlier=True, **p) for p in content.get("auth_chain", [])
-        # ]
+        auth_chain = [
+            self.event_from_pdu_json(p, outlier=True)
+            for p in content.get("auth_chain", [])
+        ]
+
+        auth_chain.sort(key=lambda e: e.depth)
 
-        defer.returnValue(state)
+        defer.returnValue({
+            "state": state,
+            "auth_chain": auth_chain,
+        })
 
     @defer.inlineCallbacks
     def send_invite(self, destination, context, event_id, pdu):
@@ -543,20 +565,34 @@ class ReplicationLayer(object):
         state = None
 
         # We need to make sure we have all the auth events.
-        for e_id, _ in pdu.auth_events:
-            exists = yield self._get_persisted_pdu(
-                origin,
-                e_id,
-                do_auth=False
-            )
-
-            if not exists:
-                yield self.get_pdu(
-                    origin,
-                    event_id=e_id,
-                    outlier=True,
-                )
-                logger.debug("Processed pdu %s", e_id)
+        # for e_id, _ in pdu.auth_events:
+        #     exists = yield self._get_persisted_pdu(
+        #         origin,
+        #         e_id,
+        #         do_auth=False
+        #     )
+        #
+        #     if not exists:
+        #         try:
+        #             logger.debug(
+        #                 "_handle_new_pdu fetch missing auth event %s from %s",
+        #                 e_id,
+        #                 origin,
+        #             )
+        #
+        #             yield self.get_pdu(
+        #                 origin,
+        #                 event_id=e_id,
+        #                 outlier=True,
+        #             )
+        #
+        #             logger.debug("Processed pdu %s", e_id)
+        #         except:
+        #             logger.warn(
+        #                 "Failed to get auth event %s from %s",
+        #                 e_id,
+        #                 origin
+        #             )
 
         # Get missing pdus if necessary.
         if not pdu.outlier:
@@ -565,6 +601,11 @@ class ReplicationLayer(object):
                 pdu.room_id
             )
 
+            logger.debug(
+                "_handle_new_pdu min_depth for %s: %d",
+                pdu.room_id, min_depth
+            )
+
             if min_depth and pdu.depth > min_depth:
                 for event_id, hashes in pdu.prev_events:
                     exists = yield self._get_persisted_pdu(
@@ -574,11 +615,14 @@ class ReplicationLayer(object):
                     )
 
                     if not exists:
-                        logger.debug("Requesting pdu %s", event_id)
+                        logger.debug(
+                            "_handle_new_pdu requesting pdu %s",
+                            event_id
+                        )
 
                         try:
                             yield self.get_pdu(
-                                pdu.origin,
+                                origin,
                                 event_id=event_id,
                             )
                             logger.debug("Processed pdu %s", event_id)
@@ -588,12 +632,17 @@ class ReplicationLayer(object):
             else:
                 # We need to get the state at this event, since we have reached
                 # a backward extremity edge.
+                logger.debug(
+                    "_handle_new_pdu getting state for %s",
+                    pdu.room_id
+                )
                 state = yield self.get_state_for_context(
                     origin, pdu.room_id, pdu.event_id,
                 )
 
         if not backfilled:
             ret = yield self.handler.on_receive_pdu(
+                origin,
                 pdu,
                 backfilled=backfilled,
                 state=state,
@@ -804,7 +853,10 @@ class _TransactionQueue(object):
 
                 # Ensures we don't continue until all callbacks on that
                 # deferred have fired
-                yield deferred
+                try:
+                    yield deferred
+                except:
+                    pass
 
             logger.debug("TX [%s] Yielded to callbacks", destination)
 
@@ -816,7 +868,8 @@ class _TransactionQueue(object):
             logger.exception(e)
 
             for deferred in deferreds:
-                deferred.errback(e)
+                if not deferred.called:
+                    deferred.errback(e)
 
         finally:
             # We want to be *very* sure we delete this after we stop processing