diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b5d882fd65..079f46dffd 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -138,26 +138,29 @@ class FederationHandler(BaseHandler):
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
# layer, then we should handle them (if we haven't before.)
+
+ event_infos = []
+
for e in itertools.chain(auth_chain, state):
if e.event_id in seen_ids:
continue
-
e.internal_metadata.outlier = True
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
- seen_ids.add(e.event_id)
- except:
- logger.exception(
- "Failed to handle state event %s",
- e.event_id,
- )
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids
+ }
+ event_infos.append({
+ "event": e,
+ "auth_events": auth,
+ })
+ seen_ids.add(e.event_id)
+
+ yield self._handle_new_events(
+ origin,
+ event_infos,
+ outliers=True
+ )
try:
_, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -292,38 +295,29 @@ class FederationHandler(BaseHandler):
).addErrback(unwrapFirstError)
auth_events.update({a.event_id: a for a in results})
- yield defer.gatherResults(
- [
- self._handle_new_event(
- dest, a,
- auth_events={
- (auth_events[a_id].type, auth_events[a_id].state_key):
- auth_events[a_id]
- for a_id, _ in a.auth_events
- },
- )
- for a in auth_events.values()
- if a.event_id not in seen_events
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
-
- yield defer.gatherResults(
- [
- self._handle_new_event(
- dest, event_map[e_id],
- state=events_to_state[e_id],
- backfilled=True,
- auth_events={
- (auth_events[a_id].type, auth_events[a_id].state_key):
- auth_events[a_id]
- for a_id, _ in event_map[e_id].auth_events
- },
- )
- for e_id in events_to_state
- ],
- consumeErrors=True
- ).addErrback(unwrapFirstError)
+ ev_infos = []
+ for a in auth_events.values():
+ if a.event_id in seen_events:
+ continue
+ ev_infos.append({
+ "event": a,
+ "auth_events": {
+ (auth_events[a_id].type, auth_events[a_id].state_key):
+ auth_events[a_id]
+ for a_id, _ in a.auth_events
+ }
+ })
+
+ for e_id in events_to_state:
+ ev_infos.append({
+ "event": event_map[e_id],
+ "state": events_to_state[e_id],
+ "auth_events": {
+ (auth_events[a_id].type, auth_events[a_id].state_key):
+ auth_events[a_id]
+ for a_id, _ in event_map[e_id].auth_events
+ }
+ })
events.sort(key=lambda e: e.depth)
@@ -331,10 +325,14 @@ class FederationHandler(BaseHandler):
if event in events_to_state:
continue
- yield self._handle_new_event(
- dest, event,
- backfilled=True,
- )
+ ev_infos.append({
+ "event": event,
+ })
+
+ yield self._handle_new_events(
+ dest, ev_infos,
+ backfilled=True,
+ )
defer.returnValue(events)
@@ -600,32 +598,22 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- yield self._handle_auth_events(
- origin, [e for e in auth_chain if e.event_id != event.event_id]
- )
-
- @defer.inlineCallbacks
- def handle_state(e):
+ ev_infos = []
+ for e in itertools.chain(state, auth_chain):
if e.event_id == event.event_id:
- return
+ continue
e.internal_metadata.outlier = True
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ ev_infos.append({
+ "event": e,
+ "auth_events": {
(e.type, e.state_key): e for e in auth_chain
if e.event_id in auth_ids
}
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle state event %s",
- e.event_id,
- )
+ })
- yield defer.DeferredList([handle_state(e) for e in state])
+ yield self._handle_new_events(origin, ev_infos, outliers=True)
auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
@@ -940,11 +928,54 @@ class FederationHandler(BaseHandler):
def _handle_new_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
- logger.debug(
- "_handle_new_event: %s, sigs: %s",
- event.event_id, event.signatures,
+ outlier = event.internal_metadata.is_outlier()
+
+ context = yield self._prep_event(
+ origin, event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ auth_events=auth_events,
)
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=backfilled,
+ is_new_state=(not outlier and not backfilled),
+ current_state=current_state,
+ )
+
+ defer.returnValue((context, event_stream_id, max_stream_id))
+
+ @defer.inlineCallbacks
+ def _handle_new_events(self, origin, event_infos, backfilled=False,
+ outliers=False):
+ contexts = yield defer.gatherResults(
+ [
+ self._prep_event(
+ origin,
+ ev_info["event"],
+ state=ev_info.get("state"),
+ backfilled=backfilled,
+ auth_events=ev_info.get("auth_events"),
+ )
+ for ev_info in event_infos
+ ]
+ )
+
+ yield self.store.persist_events(
+ [
+ (ev_info["event"], context)
+ for ev_info, context in itertools.izip(event_infos, contexts)
+ ],
+ backfilled=backfilled,
+ is_new_state=(not outliers and not backfilled),
+ )
+
+ @defer.inlineCallbacks
+ def _prep_event(self, origin, event, state=None, backfilled=False,
+ current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
context = yield self.state_handler.compute_event_context(
@@ -954,13 +985,6 @@ class FederationHandler(BaseHandler):
if not auth_events:
auth_events = context.current_state
- logger.debug(
- "_handle_new_event: %s, auth_events: %s",
- event.event_id, auth_events,
- )
-
- is_new_state = not outlier
-
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
@@ -984,26 +1008,7 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
- # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
- # seen all the auth events.
- yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- is_new_state=False,
- current_state=current_state,
- )
- raise
-
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- is_new_state=(is_new_state and not backfilled),
- current_state=current_state,
- )
-
- defer.returnValue((context, event_stream_id, max_stream_id))
+ defer.returnValue(context)
@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
@@ -1066,14 +1071,24 @@ class FederationHandler(BaseHandler):
@log_function
def do_auth(self, origin, event, context, auth_events):
# Check if we have all the auth events.
- have_events = yield self.store.have_events(
- [e_id for e_id, _ in event.auth_events]
- )
-
+ current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)
+
+ if event_auth_events - current_state:
+ have_events = yield self.store.have_events(
+ event_auth_events - current_state
+ )
+ else:
+ have_events = {}
+
+ have_events.update({
+ e.event_id: ""
+ for e in auth_events.values()
+ })
+
seen_events = set(have_events.keys())
- missing_auth = event_auth_events - seen_events
+ missing_auth = event_auth_events - seen_events - current_state
if missing_auth:
logger.info("Missing auth: %s", missing_auth)
|