diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 38bebbf598..d05b63673f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -106,7 +106,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.store = hs.get_datastore()
+ self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -323,14 +323,22 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- # Calculate the state of the previous events, and
- # de-conflict them to find the current state.
- state_groups = []
+ # Calculate the state after each of the previous events, and
+ # resolve them to find the correct state at the current event.
auth_chains = set()
+ event_map = {
+ event_id: pdu,
+ }
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(room_id, list(seen))
- state_groups.append(ours)
+ ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+ # state_maps is a list of mappings from (type, state_key) to event_id
+ # type: list[dict[tuple[str, str], str]]
+ state_maps = list(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
# Ask the remote server for the states we don't
# know about
@@ -350,28 +358,54 @@ class FederationHandler(BaseHandler):
)
)
+ # we want the state *after* p; get_state_for_room returns the
+ # state *before* p.
+ remote_event = yield self.federation_client.get_pdu(
+ [origin], p, outlier=True,
+ )
+
+ if remote_event is None:
+ raise Exception(
+ "Unable to get missing prev_event %s" % (p, )
+ )
+
+ if remote_event.is_state():
+ remote_state.append(remote_event)
+
# XXX hrm I'm not convinced that duplicate events will compare
# for equality, so I'm not sure this does what the author
# hoped.
auth_chains.update(got_auth_chain)
- state_group = {
+ remote_state_map = {
(x.type, x.state_key): x.event_id for x in remote_state
}
- state_groups.append(state_group)
+ state_maps.append(remote_state_map)
+
+ for x in remote_state:
+ event_map[x.event_id] = x
# Resolve any conflicting state
+ @defer.inlineCallbacks
def fetch(ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False
+ fetched = yield self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
)
+ # add any events we fetch here to the `event_map` so that we
+ # can use them to build the state event list below.
+ event_map.update(fetched)
+ defer.returnValue(fetched)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {event_id: pdu}, fetch
+ room_version, state_maps, event_map, fetch,
)
- state = (yield self.store.get_events(state_map.values())).values()
+ # we need to give _process_received_pdu the actual state events
+ # rather than event ids, so generate that now.
+ state = [
+ event_map[e] for e in six.itervalues(state_map)
+ ]
auth_chain = list(auth_chains)
except Exception:
logger.warn(
|