diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b883c64f1b..caaacecfb7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -65,7 +65,6 @@ from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRes
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import UserID, get_domain_from_id
-from synapse.util import batch_iter, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.distributor import user_joined_room
from synapse.util.retryutils import NotRetryingDestination
@@ -238,7 +237,6 @@ class FederationHandler(BaseHandler):
return None
state = None
- auth_chain = []
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
@@ -342,7 +340,6 @@ class FederationHandler(BaseHandler):
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
- auth_chains = set()
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
@@ -366,24 +363,14 @@ class FederationHandler(BaseHandler):
p,
)
- room_version = await self.store.get_room_version(room_id)
-
with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
- (
- remote_state,
- got_auth_chain,
- ) = await self._get_state_for_room(
+ (remote_state, _,) = await self._get_state_for_room(
origin, room_id, p, include_event_in_state=True
)
- # XXX hrm I'm not convinced that duplicate events will compare
- # for equality, so I'm not sure this does what the author
- # hoped.
- auth_chains.update(got_auth_chain)
-
remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
@@ -392,6 +379,7 @@ class FederationHandler(BaseHandler):
for x in remote_state:
event_map[x.event_id] = x
+ room_version = await self.store.get_room_version(room_id)
state_map = await resolve_events_with_store(
room_id,
room_version,
@@ -413,7 +401,6 @@ class FederationHandler(BaseHandler):
event_map.update(evs)
state = [event_map[e] for e in six.itervalues(state_map)]
- auth_chain = list(auth_chains)
except Exception:
logger.warning(
"[%s %s] Error attempting to resolve state at missing "
@@ -429,9 +416,7 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
- await self._process_received_pdu(
- origin, pdu, state=state, auth_chain=auth_chain
- )
+ await self._process_received_pdu(origin, pdu, state=state)
async def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
"""
@@ -633,6 +618,8 @@ class FederationHandler(BaseHandler):
room_id (str)
event_ids (Iterable[str])
+ Persists any events we don't already have as outliers.
+
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
@@ -652,27 +639,15 @@ class FederationHandler(BaseHandler):
room_id,
)
- room_version = yield self.store.get_room_version(room_id)
-
- # XXX 20 requests at once? really?
- for batch in batch_iter(missing_events, 20):
- deferreds = [
- run_in_background(
- self.federation_client.get_pdu,
- destinations=[destination],
- event_id=e_id,
- room_version=room_version,
- )
- for e_id in batch
- ]
-
- res = yield make_deferred_yieldable(
- defer.DeferredList(deferreds, consumeErrors=True)
- )
+ yield self._get_events_and_persist(
+ destination=destination, room_id=room_id, events=missing_events
+ )
- for success, result in res:
- if success and result:
- fetched_events[result.event_id] = result
+ # we need to make sure we re-load from the database to get the rejected
+ # state correct.
+ fetched_events.update(
+ (yield self.store.get_events(missing_events, allow_rejected=True))
+ )
# check for events which were in the wrong room.
#
@@ -702,7 +677,7 @@ class FederationHandler(BaseHandler):
return fetched_events
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, event, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
|