summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py42
1 files changed, 25 insertions, 17 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 904c7c0945..d3b46b24c1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,6 +22,7 @@ from .units import Edu
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
+from synapse.util import unwrapFirstError
 from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
@@ -164,16 +165,17 @@ class FederationClient(FederationBase):
             for p in transaction_data["pdus"]
         ]
 
-        for i, pdu in enumerate(pdus):
-            pdus[i] = yield self._check_sigs_and_hash(pdu)
-
-            # FIXME: We should handle signature failures more gracefully.
+        # FIXME: We should handle signature failures more gracefully.
+        pdus[:] = yield defer.gatherResults(
+            [self._check_sigs_and_hash(pdu) for pdu in pdus],
+            consumeErrors=True,
+        ).addErrback(unwrapFirstError)
 
         defer.returnValue(pdus)
 
     @defer.inlineCallbacks
     @log_function
-    def get_pdu(self, destinations, event_id, outlier=False):
+    def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
         """Requests the PDU with given origin and ID from the remote home
         servers.
 
@@ -189,6 +191,8 @@ class FederationClient(FederationBase):
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
                 it's from an arbitary point in the context as opposed to part
                 of the current block of PDUs. Defaults to `False`
+            timeout (int): How long to try (in ms) each destination for before
+                moving to the next destination. None indicates no timeout.
 
         Returns:
             Deferred: Results in the requested PDU.
@@ -212,7 +216,7 @@ class FederationClient(FederationBase):
 
                 with limiter:
                     transaction_data = yield self.transport_layer.get_event(
-                        destination, event_id
+                        destination, event_id, timeout=timeout,
                     )
 
                     logger.debug("transaction_data %r", transaction_data)
@@ -222,7 +226,7 @@ class FederationClient(FederationBase):
                         for p in transaction_data["pdus"]
                     ]
 
-                    if pdu_list:
+                    if pdu_list and pdu_list[0]:
                         pdu = pdu_list[0]
 
                         # Check signatures are correct.
@@ -255,7 +259,7 @@ class FederationClient(FederationBase):
                 )
                 continue
 
-        if self._get_pdu_cache is not None:
+        if self._get_pdu_cache is not None and pdu:
             self._get_pdu_cache[event_id] = pdu
 
         defer.returnValue(pdu)
@@ -370,13 +374,17 @@ class FederationClient(FederationBase):
                     for p in content.get("auth_chain", [])
                 ]
 
-                signed_state = yield self._check_sigs_and_hash_and_fetch(
-                    destination, state, outlier=True
-                )
-
-                signed_auth = yield self._check_sigs_and_hash_and_fetch(
-                    destination, auth_chain, outlier=True
-                )
+                signed_state, signed_auth = yield defer.gatherResults(
+                    [
+                        self._check_sigs_and_hash_and_fetch(
+                            destination, state, outlier=True
+                        ),
+                        self._check_sigs_and_hash_and_fetch(
+                            destination, auth_chain, outlier=True
+                        )
+                    ],
+                    consumeErrors=True
+                ).addErrback(unwrapFirstError)
 
                 auth_chain.sort(key=lambda e: e.depth)
 
@@ -518,7 +526,7 @@ class FederationClient(FederationBase):
             # Are we missing any?
 
             seen_events = set(earliest_events_ids)
-            seen_events.update(e.event_id for e in signed_events)
+            seen_events.update(e.event_id for e in signed_events if e)
 
             missing_events = {}
             for e in itertools.chain(latest_events, signed_events):
@@ -561,7 +569,7 @@ class FederationClient(FederationBase):
 
             res = yield defer.DeferredList(deferreds, consumeErrors=True)
             for (result, val), (e_id, _) in zip(res, ordered_missing):
-                if result:
+                if result and val:
                     signed_events.append(val)
                 else:
                     failed_to_fetch.add(e_id)