From ae702d161ab6d518caa91759ec6bdec01b11954f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:08:02 +0000 Subject: Handle if get_missing_pdu returns 400 or not all events. --- synapse/federation/federation_client.py | 109 ++++++++++++++++++++++++++++---- synapse/federation/federation_server.py | 6 +- 2 files changed, 100 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ca89a0787c..b87c8a3bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,14 +19,18 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException, SynapseError +from synapse.api.errors import ( + CodeMessageException, HttpResponseException, SynapseError, +) from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +import itertools import logging +import random logger = logging.getLogger(__name__) @@ -440,21 +444,100 @@ class FederationClient(FederationBase): defer.returnValue(ret) @defer.inlineCallbacks - def get_missing_events(self, destination, room_id, earliest_events, + def get_missing_events(self, destination, room_id, earliest_events_ids, latest_events, limit, min_depth): - content = yield self.transport_layer.get_missing_events( - destination, room_id, earliest_events, latest_events, limit, - min_depth, - ) + try: + content = yield self.transport_layer.get_missing_events( + destination=destination, + room_id=room_id, + earliest_events=earliest_events_ids, + latest_events=[e.event_id for e in latest_events], + limit=limit, + min_depth=min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + have_gotten_all_from_destination = True + except HttpResponseException as e: + if not e.code == 400: + raise - events = [ - self.event_from_pdu_json(e) - for e in content.get("events", []) - ] + signed_events = [] + have_gotten_all_from_destination = False - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=True - ) + if len(signed_events) >= limit: + defer.returnValue(signed_events) + + servers = yield self.store.get_joined_hosts_for_room(room_id) + + servers = set(servers) + servers.discard(self.server_name) + + failed_to_fetch = set() + + while len(signed_events) < limit: + # Are we missing any? + + seen_events = set(earliest_events_ids) + seen_events.update(e.event_id for e in signed_events) + + missing_events = {} + for e in itertools.chain(latest_events, signed_events): + missing_events.update({ + e_id: e.depth for e_id, _ in e.prev_events + if e_id not in seen_events and e_id not in failed_to_fetch + }) + + if not missing_events: + break + + have_seen = yield self.store.have_events(missing_events) + + for k in have_seen: + missing_events.pop(k, None) + + if not missing_events: + break + + # Okay, we haven't gotten everything yet. Lets get them. + ordered_missing = sorted(missing_events.items(), key=lambda x: x[0]) + + if have_gotten_all_from_destination: + servers.discard(destination) + + def random_server_list(): + srvs = list(servers) + random.shuffle(srvs) + return srvs + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ) + for e_id, depth in ordered_missing[:limit - len(signed_events)] + ] + + got_a_new_event = False + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in zip(res, ordered_missing): + if result: + signed_events.append(val) + got_a_new_event = True + else: + failed_to_fetch.add(e_id) + + if not got_a_new_event: + break defer.returnValue(signed_events) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4264d857be..dd4ca74ba6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -413,12 +413,14 @@ class FederationServer(FederationBase): missing_events = yield self.get_missing_events( origin, pdu.room_id, - earliest_events=list(latest), - latest_events=[pdu.event_id], + earliest_events_ids=list(latest), + latest_events=[pdu], limit=10, min_depth=min_depth, ) + missing_events.sort(key=lambda x: x.depth) + for e in missing_events: yield self._handle_new_pdu( origin, -- cgit 1.4.1 From 6dfd8c73fcdd727cd6589513e2b8059f779623ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:13 +0000 Subject: Docs. --- synapse/federation/federation_client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b87c8a3bbb..11e2753fed 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -446,6 +446,20 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_missing_events(self, destination, room_id, earliest_events_ids, latest_events, limit, min_depth): + """Tries to fetch events we are missing. This is called when we receive + an event without having received all of its ancestors. + + Args: + destination (str) + room_id (str) + earliest_events_ids (list): List of event ids. Effectively the + events we expected to receive, but haven't. `get_missing_events` + should only return events that didn't happen before these. + latest_events (list): List of events we have received that we don't + have all previous events for. + limit (int): Maximum number of events to return. + min_depth (int): Minimum depth of events tor return. + """ try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -470,6 +484,8 @@ class FederationClient(FederationBase): if not e.code == 400: raise + # We are probably hitting an old server that doesn't support + # get_missing_events signed_events = [] have_gotten_all_from_destination = False -- cgit 1.4.1 From 39aa968a764816632a05ac0e3cf9c865b7a3a68d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:32 +0000 Subject: Respect min_depth argument --- synapse/federation/federation_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 11e2753fed..75b6a7b46a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -507,10 +507,12 @@ class FederationClient(FederationBase): missing_events = {} for e in itertools.chain(latest_events, signed_events): - missing_events.update({ - e_id: e.depth for e_id, _ in e.prev_events - if e_id not in seen_events and e_id not in failed_to_fetch - }) + if e.depth > min_depth: + missing_events.update({ + e_id: e.depth for e_id, _ in e.prev_events + if e_id not in seen_events + and e_id not in failed_to_fetch + }) if not missing_events: break -- cgit 1.4.1 From 96fee64421a534787e9316a61ab407b43c782dc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:47 +0000 Subject: Remove unecessary check --- synapse/federation/federation_client.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 75b6a7b46a..f131941f45 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -544,19 +544,13 @@ class FederationClient(FederationBase): for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - got_a_new_event = False - res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): if result: signed_events.append(val) - got_a_new_event = True else: failed_to_fetch.add(e_id) - if not got_a_new_event: - break - defer.returnValue(signed_events) def event_from_pdu_json(self, pdu_json, outlier=False): -- cgit 1.4.1 From 9708f49abfb5fa48c1190364093ab4ce5c4e6f23 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:35:16 +0000 Subject: Docs --- synapse/federation/federation_server.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index dd4ca74ba6..9c7dcdba96 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -419,6 +419,8 @@ class FederationServer(FederationBase): min_depth=min_depth, ) + # We want to sort these by depth so we process them and + # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for e in missing_events: -- cgit 1.4.1