diff options
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 123 |
1 files changed, 68 insertions, 55 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..9c7dcdba96 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -112,17 +112,20 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): - dl = [] + results = [] + for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) - def handle_failure(failure): - failure.trap(FederationError) - self.send_failure(failure.value, transaction.origin) - - d.addErrback(handle_failure) - - dl.append(d) + try: + yield d + results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) + except Exception as e: + results.append({"error": str(e)}) + logger.exception("Failed to handle PDU") if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -135,21 +138,11 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - results = yield defer.DeferredList(dl, consumeErrors=True) - - ret = [] - for r in results: - if r[0]: - ret.append({}) - else: - logger.exception(r[1]) - ret.append({"error": str(r[1].value)}) - - logger.debug("Returning: %s", str(ret)) + logger.debug("Returning: %s", str(results)) response = { "pdus": dict(zip( - (p.event_id for p in pdu_list), ret + (p.event_id for p in pdu_list), results )), } @@ -305,6 +298,20 @@ class FederationServer(FederationBase): (200, send_content) ) + @defer.inlineCallbacks + @log_function + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth + ) + + time_now = self._clock.time_msec() + + defer.returnValue({ + "events": [ev.get_pdu_json(time_now) for ev in missing_events], + }) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. @@ -331,7 +338,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, origin, pdu, max_recursion=10): + def _handle_new_pdu(self, origin, pdu, get_missing=True): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu( origin, pdu.event_id, do_auth=False @@ -383,48 +390,54 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this # message, to work around the fact that some events will # reference really really old events we really don't want to # send to the clients. pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth and max_recursion > 0: - for event_id, hashes in pdu.prev_events: - if event_id not in have_seen: - logger.debug( - "_handle_new_pdu requesting pdu %s", - event_id + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + latest_tuples = yield self.store.get_latest_events_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(e_id for e_id, _, _ in latest_tuples) + latest |= seen + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False ) - try: - new_pdu = yield self.federation_client.get_pdu( - [origin, pdu.origin], - event_id=event_id, - ) - - if new_pdu: - yield self._handle_new_pdu( - origin, - new_pdu, - max_recursion=max_recursion-1 - ) - - logger.debug("Processed pdu %s", event_id) - else: - logger.warn("Failed to get PDU %s", event_id) - fetch_state = True - except: - # TODO(erikj): Do some more intelligent retries. - logger.exception("Failed to get PDU") - fetch_state = True - else: - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - fetch_state = True - else: - fetch_state = True + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True if fetch_state: # We need to get the state at this event, since we haven't |