summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-05 16:08:02 +0000
committerErik Johnston <erik@matrix.org>2015-03-05 16:08:02 +0000
commitae702d161ab6d518caa91759ec6bdec01b11954f (patch)
treefd70589e10c80af4cdf81a6554b8993868614b68 /synapse/federation/federation_client.py
parentGive LruCache a __len__, so that len(cache) works (diff)
downloadsynapse-ae702d161ab6d518caa91759ec6bdec01b11954f.tar.xz
Handle if get_missing_pdu returns 400 or not all events.
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py109
1 files changed, 96 insertions, 13 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ca89a0787c..b87c8a3bbb 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,14 +19,18 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Edu
 
-from synapse.api.errors import CodeMessageException, SynapseError
+from synapse.api.errors import (
+    CodeMessageException, HttpResponseException, SynapseError,
+)
 from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 
 from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
 
+import itertools
 import logging
+import random
 
 
 logger = logging.getLogger(__name__)
@@ -440,21 +444,100 @@ class FederationClient(FederationBase):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_missing_events(self, destination, room_id, earliest_events,
+    def get_missing_events(self, destination, room_id, earliest_events_ids,
                            latest_events, limit, min_depth):
-        content = yield self.transport_layer.get_missing_events(
-            destination, room_id, earliest_events, latest_events, limit,
-            min_depth,
-        )
+        try:
+            content = yield self.transport_layer.get_missing_events(
+                destination=destination,
+                room_id=room_id,
+                earliest_events=earliest_events_ids,
+                latest_events=[e.event_id for e in latest_events],
+                limit=limit,
+                min_depth=min_depth,
+            )
+
+            events = [
+                self.event_from_pdu_json(e)
+                for e in content.get("events", [])
+            ]
+
+            signed_events = yield self._check_sigs_and_hash_and_fetch(
+                destination, events, outlier=True
+            )
+
+            have_gotten_all_from_destination = True
+        except HttpResponseException as e:
+            if not e.code == 400:
+                raise
 
-        events = [
-            self.event_from_pdu_json(e)
-            for e in content.get("events", [])
-        ]
+            signed_events = []
+            have_gotten_all_from_destination = False
 
-        signed_events = yield self._check_sigs_and_hash_and_fetch(
-            destination, events, outlier=True
-        )
+        if len(signed_events) >= limit:
+            defer.returnValue(signed_events)
+
+        servers = yield self.store.get_joined_hosts_for_room(room_id)
+
+        servers = set(servers)
+        servers.discard(self.server_name)
+
+        failed_to_fetch = set()
+
+        while len(signed_events) < limit:
+            # Are we missing any?
+
+            seen_events = set(earliest_events_ids)
+            seen_events.update(e.event_id for e in signed_events)
+
+            missing_events = {}
+            for e in itertools.chain(latest_events, signed_events):
+                missing_events.update({
+                    e_id: e.depth for e_id, _ in e.prev_events
+                    if e_id not in seen_events and e_id not in failed_to_fetch
+                })
+
+            if not missing_events:
+                break
+
+            have_seen = yield self.store.have_events(missing_events)
+
+            for k in have_seen:
+                missing_events.pop(k, None)
+
+            if not missing_events:
+                break
+
+            # Okay, we haven't gotten everything yet. Lets get them.
+            ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
+
+            if have_gotten_all_from_destination:
+                servers.discard(destination)
+
+            def random_server_list():
+                srvs = list(servers)
+                random.shuffle(srvs)
+                return srvs
+
+            deferreds = [
+                self.get_pdu(
+                    destinations=random_server_list(),
+                    event_id=e_id,
+                )
+                for e_id, depth in ordered_missing[:limit - len(signed_events)]
+            ]
+
+            got_a_new_event = False
+
+            res = yield defer.DeferredList(deferreds, consumeErrors=True)
+            for (result, val), (e_id, _) in zip(res, ordered_missing):
+                if result:
+                    signed_events.append(val)
+                    got_a_new_event = True
+                else:
+                    failed_to_fetch.add(e_id)
+
+            if not got_a_new_event:
+                break
 
         defer.returnValue(signed_events)