diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 34bc397e8a..f74e16abd5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -142,7 +142,15 @@ class FederationServer(FederationBase):
if r[0]:
ret.append({})
else:
- logger.exception(r[1])
+ failure = r[1]
+ logger.error(
+ "Failed to handle PDU",
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()
+ )
+ )
ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret))
@@ -306,75 +314,17 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
- def get_missing_events(self, origin, room_id, earliest_events,
- latest_events, limit, min_depth):
- limit = max(limit, 50)
- min_depth = max(min_depth, 0)
-
- missing_events = yield self.store.get_missing_events(
- room_id=room_id,
- earliest_events=earliest_events,
- latest_events=latest_events,
- limit=limit,
- min_depth=min_depth,
+ @log_function
+ def on_get_missing_events(self, origin, room_id, earliest_events,
+ latest_events, limit, min_depth):
+ missing_events = yield self.handler.on_get_missing_events(
+ origin, room_id, earliest_events, latest_events, limit, min_depth
)
- known_ids = {e.event_id for e in missing_events} | {earliest_events}
-
- back_edges = {
- e for e in missing_events
- if {i for i, h in e.prev_events.items()} <= known_ids
- }
-
- decoded_auth_events = set()
- state = {}
- auth_events = set()
- auth_and_state = {}
- for event in back_edges:
- state_pdus = yield self.handler.get_state_for_pdu(
- origin, room_id, event.event_id,
- do_auth=False,
- )
-
- state[event.event_id] = [s.event_id for s in state_pdus]
-
- auth_and_state.update({
- s.event_id: s for s in state_pdus
- })
-
- state_ids = {pdu.event_id for pdu in state_pdus}
- prev_ids = {i for i, h in event.prev_events.items()}
- partial_auth_chain = yield self.store.get_auth_chain(
- state_ids | prev_ids, have_ids=decoded_auth_events.keys()
- )
-
- for p in partial_auth_chain:
- p.signatures.update(
- compute_event_signature(
- p,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
- auth_events.update(
- a.event_id for a in partial_auth_chain
- )
-
- auth_and_state.update({
- a.event_id: a for a in partial_auth_chain
- })
-
time_now = self._clock.time_msec()
defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events],
- "state_for_events": state,
- "auth_events": auth_events,
- "event_map": {
- k: ev.get_pdu_json(time_now)
- for k, ev in auth_and_state.items()
- },
})
@log_function
@@ -403,7 +353,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
@log_function
- def _handle_new_pdu(self, origin, pdu, max_recursion=10):
+ def _handle_new_pdu(self, origin, pdu, get_missing=True):
# We reprocess pdus when we have seen them only as outliers
existing = yield self._get_persisted_pdu(
origin, pdu.event_id, do_auth=False
@@ -455,48 +405,50 @@ class FederationServer(FederationBase):
pdu.room_id, min_depth
)
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
# message, to work around the fact that some events will
# reference really really old events we really don't want to
# send to the clients.
pdu.internal_metadata.outlier = True
- elif min_depth and pdu.depth > min_depth and max_recursion > 0:
- for event_id, hashes in pdu.prev_events:
- if event_id not in have_seen:
- logger.debug(
- "_handle_new_pdu requesting pdu %s",
- event_id
+ elif min_depth and pdu.depth > min_depth:
+ if get_missing and prevs - seen:
+ latest_tuples = yield self.store.get_latest_events_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(e_id for e_id, _, _ in latest_tuples)
+ latest |= seen
+
+ missing_events = yield self.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events=list(latest),
+ latest_events=[pdu.event_id],
+ limit=10,
+ min_depth=min_depth,
+ )
+
+ for e in missing_events:
+ yield self._handle_new_pdu(
+ origin,
+ e,
+ get_missing=False
)
- try:
- new_pdu = yield self.federation_client.get_pdu(
- [origin, pdu.origin],
- event_id=event_id,
- )
-
- if new_pdu:
- yield self._handle_new_pdu(
- origin,
- new_pdu,
- max_recursion=max_recursion-1
- )
-
- logger.debug("Processed pdu %s", event_id)
- else:
- logger.warn("Failed to get PDU %s", event_id)
- fetch_state = True
- except:
- # TODO(erikj): Do some more intelligent retries.
- logger.exception("Failed to get PDU")
- fetch_state = True
- else:
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
- if prevs - seen:
- fetch_state = True
- else:
- fetch_state = True
+ have_seen = yield self.store.have_events(
+ [ev for ev, _ in pdu.prev_events]
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = set(have_seen.keys())
+ if prevs - seen:
+ fetch_state = True
if fetch_state:
# We need to get the state at this event, since we haven't
|