diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8d99101619..8197d8b2d0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -125,60 +125,72 @@ class FederationHandler(BaseHandler):
)
if not is_in_room and not event.internal_metadata.is_outlier():
logger.debug("Got event for room we're not in.")
- current_state = state
- event_ids = set()
- if state:
- event_ids |= {e.event_id for e in state}
- if auth_chain:
- event_ids |= {e.event_id for e in auth_chain}
+ try:
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ else:
+ event_ids = set()
+ if state:
+ event_ids |= {e.event_id for e in state}
+ if auth_chain:
+ event_ids |= {e.event_id for e in auth_chain}
+
+ seen_ids = set(
+ (yield self.store.have_events(event_ids)).keys()
+ )
- if state and auth_chain is not None:
- # If we have any state or auth_chain given to us by the replication
- # layer, then we should handle them (if we haven't before.)
+ if state and auth_chain is not None:
+ # If we have any state or auth_chain given to us by the replication
+ # layer, then we should handle them (if we haven't before.)
- event_infos = []
+ event_infos = []
- for e in itertools.chain(auth_chain, state):
- if e.event_id in seen_ids:
- continue
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- event_infos.append({
- "event": e,
- "auth_events": auth,
- })
- seen_ids.add(e.event_id)
+ for e in itertools.chain(auth_chain, state):
+ if e.event_id in seen_ids:
+ continue
+ e.internal_metadata.outlier = True
+ auth_ids = [e_id for e_id, _ in e.auth_events]
+ auth = {
+ (e.type, e.state_key): e for e in auth_chain
+ if e.event_id in auth_ids
+ }
+ event_infos.append({
+ "event": e,
+ "auth_events": auth,
+ })
+ seen_ids.add(e.event_id)
- yield self._handle_new_events(
- origin,
- event_infos,
- outliers=True
- )
+ yield self._handle_new_events(
+ origin,
+ event_infos,
+ outliers=True
+ )
- try:
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- event,
- state=state,
- backfilled=backfilled,
- current_state=current_state,
- )
- except AuthError as e:
- raise FederationError(
- "ERROR",
- e.code,
- e.msg,
- affected=event.event_id,
- )
+ try:
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin,
+ event,
+ state=state,
+ backfilled=backfilled,
+ current_state=current_state,
+ )
+ except AuthError as e:
+ raise FederationError(
+ "ERROR",
+ e.code,
+ e.msg,
+ affected=event.event_id,
+ )
# if we're receiving valid events from an origin,
# it's probably a good idea to mark it as not in retry-state
@@ -650,35 +662,8 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- ev_infos = []
- for e in itertools.chain(state, auth_chain):
- if e.event_id == event.event_id:
- continue
-
- e.internal_metadata.outlier = True
- auth_ids = [e_id for e_id, _ in e.auth_events]
- ev_infos.append({
- "event": e,
- "auth_events": {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- })
-
- yield self._handle_new_events(origin, ev_infos, outliers=True)
-
- auth_ids = [e_id for e_id, _ in event.auth_events]
- auth_events = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
-
- _, event_stream_id, max_stream_id = yield self._handle_new_event(
- origin,
- new_event,
- state=state,
- current_state=state,
- auth_events=auth_events,
+ event_stream_id, max_stream_id = yield self._persist_auth_tree(
+ auth_chain, state, event
)
with PreserveLoggingContext():
@@ -1035,6 +1020,76 @@ class FederationHandler(BaseHandler):
)
@defer.inlineCallbacks
+ def _persist_auth_tree(self, auth_events, state, event):
+ """Checks the auth chain is valid (and passes auth checks) for the
+ state and event. Then persists the auth chain and state atomically.
+ Persists the event seperately.
+
+ Returns:
+ 2-tuple of (event_stream_id, max_stream_id) from the persist_event
+ call for `event`
+ """
+ events_to_context = {}
+ for e in itertools.chain(auth_events, state):
+ ctx = yield self.state_handler.compute_event_context(
+ e, outlier=True,
+ )
+ events_to_context[e.event_id] = ctx
+ e.internal_metadata.outlier = True
+
+ event_map = {
+ e.event_id: e
+ for e in auth_events
+ }
+
+ create_event = None
+ for e in auth_events:
+ if (e.type, e.state_key) == (EventTypes.Create, ""):
+ create_event = e
+ break
+
+ for e in itertools.chain(auth_events, state, [event]):
+ auth_for_e = {
+ (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
+ for e_id, _ in e.auth_events
+ }
+ if create_event:
+ auth_for_e[(EventTypes.Create, "")] = create_event
+
+ try:
+ self.auth.check(e, auth_events=auth_for_e)
+ except AuthError as err:
+ logger.warn(
+ "Rejecting %s because %s",
+ e.event_id, err.msg
+ )
+
+ if e == event:
+ raise
+ events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+ yield self.store.persist_events(
+ [
+ (e, events_to_context[e.event_id])
+ for e in itertools.chain(auth_events, state)
+ ],
+ is_new_state=False,
+ )
+
+ new_event_context = yield self.state_handler.compute_event_context(
+ event, old_state=state, outlier=False,
+ )
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event, new_event_context,
+ backfilled=False,
+ is_new_state=True,
+ current_state=state,
+ )
+
+ defer.returnValue((event_stream_id, max_stream_id))
+
+ @defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, backfilled=False,
current_state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
|