diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ca89a0787c..f131941f45 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,14 +19,18 @@ from twisted.internet import defer
from .federation_base import FederationBase
from .units import Edu
-from synapse.api.errors import CodeMessageException, SynapseError
+from synapse.api.errors import (
+ CodeMessageException, HttpResponseException, SynapseError,
+)
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+import itertools
import logging
+import random
logger = logging.getLogger(__name__)
@@ -440,21 +444,112 @@ class FederationClient(FederationBase):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_missing_events(self, destination, room_id, earliest_events,
+ def get_missing_events(self, destination, room_id, earliest_events_ids,
latest_events, limit, min_depth):
- content = yield self.transport_layer.get_missing_events(
- destination, room_id, earliest_events, latest_events, limit,
- min_depth,
- )
+ """Tries to fetch events we are missing. This is called when we receive
+ an event without having received all of its ancestors.
- events = [
- self.event_from_pdu_json(e)
- for e in content.get("events", [])
- ]
+ Args:
+ destination (str)
+ room_id (str)
+ earliest_events_ids (list): List of event ids. Effectively the
+ events we expected to receive, but haven't. `get_missing_events`
+ should only return events that didn't happen before these.
+ latest_events (list): List of events we have received that we don't
+ have all previous events for.
+ limit (int): Maximum number of events to return.
+ min_depth (int): Minimum depth of events tor return.
+ """
+ try:
+ content = yield self.transport_layer.get_missing_events(
+ destination=destination,
+ room_id=room_id,
+ earliest_events=earliest_events_ids,
+ latest_events=[e.event_id for e in latest_events],
+ limit=limit,
+ min_depth=min_depth,
+ )
+
+ events = [
+ self.event_from_pdu_json(e)
+ for e in content.get("events", [])
+ ]
+
+ signed_events = yield self._check_sigs_and_hash_and_fetch(
+ destination, events, outlier=True
+ )
+
+ have_gotten_all_from_destination = True
+ except HttpResponseException as e:
+ if not e.code == 400:
+ raise
- signed_events = yield self._check_sigs_and_hash_and_fetch(
- destination, events, outlier=True
- )
+ # We are probably hitting an old server that doesn't support
+ # get_missing_events
+ signed_events = []
+ have_gotten_all_from_destination = False
+
+ if len(signed_events) >= limit:
+ defer.returnValue(signed_events)
+
+ servers = yield self.store.get_joined_hosts_for_room(room_id)
+
+ servers = set(servers)
+ servers.discard(self.server_name)
+
+ failed_to_fetch = set()
+
+ while len(signed_events) < limit:
+ # Are we missing any?
+
+ seen_events = set(earliest_events_ids)
+ seen_events.update(e.event_id for e in signed_events)
+
+ missing_events = {}
+ for e in itertools.chain(latest_events, signed_events):
+ if e.depth > min_depth:
+ missing_events.update({
+ e_id: e.depth for e_id, _ in e.prev_events
+ if e_id not in seen_events
+ and e_id not in failed_to_fetch
+ })
+
+ if not missing_events:
+ break
+
+ have_seen = yield self.store.have_events(missing_events)
+
+ for k in have_seen:
+ missing_events.pop(k, None)
+
+ if not missing_events:
+ break
+
+ # Okay, we haven't gotten everything yet. Lets get them.
+ ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
+
+ if have_gotten_all_from_destination:
+ servers.discard(destination)
+
+ def random_server_list():
+ srvs = list(servers)
+ random.shuffle(srvs)
+ return srvs
+
+ deferreds = [
+ self.get_pdu(
+ destinations=random_server_list(),
+ event_id=e_id,
+ )
+ for e_id, depth in ordered_missing[:limit - len(signed_events)]
+ ]
+
+ res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ for (result, val), (e_id, _) in zip(res, ordered_missing):
+ if result:
+ signed_events.append(val)
+ else:
+ failed_to_fetch.add(e_id)
defer.returnValue(signed_events)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4264d857be..9c7dcdba96 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -413,12 +413,16 @@ class FederationServer(FederationBase):
missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
- earliest_events=list(latest),
- latest_events=[pdu.event_id],
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
limit=10,
min_depth=min_depth,
)
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
for e in missing_events:
yield self._handle_new_pdu(
origin,
|