diff options
author | Erik Johnston <erik@matrix.org> | 2015-10-02 09:18:44 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-10-02 09:18:44 +0100 |
commit | 5879edbb097d19c2b5f5e064841909e67d6018fe (patch) | |
tree | 8354812b966160320b13c93059ed2861c97a4ae7 /synapse/handlers/federation.py | |
parent | Merge pull request #291 from matrix-org/receipt-validation (diff) | |
parent | Comment (diff) | |
download | synapse-5879edbb097d19c2b5f5e064841909e67d6018fe.tar.xz |
Merge pull request #283 from matrix-org/erikj/atomic_join_federation
Atomically persist events when joining a room over federation/
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 207 |
1 files changed, 131 insertions, 76 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f4dce712f9..3aff80bf59 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -125,60 +125,72 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - current_state = state - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} + try: + event_stream_id, max_stream_id = yield self._persist_auth_tree( + auth_chain, state, event + ) + except AuthError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=event.event_id, + ) - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + else: + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} + + seen_ids = set( + (yield self.store.have_events(event_ids)).keys() + ) - if state and auth_chain is not None: - # If we have any state or auth_chain given to us by the replication - # layer, then we should handle them (if we haven't before.) + if state and auth_chain is not None: + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) - event_infos = [] + event_infos = [] - for e in itertools.chain(auth_chain, state): - if e.event_id in seen_ids: - continue - e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - event_infos.append({ - "event": e, - "auth_events": auth, - }) - seen_ids.add(e.event_id) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue + e.internal_metadata.outlier = True + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) - yield self._handle_new_events( - origin, - event_infos, - outliers=True - ) + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) - try: - _, event_stream_id, max_stream_id = yield self._handle_new_event( - origin, - event, - state=state, - backfilled=backfilled, - current_state=current_state, - ) - except AuthError as e: - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) + try: + _, event_stream_id, max_stream_id = yield self._handle_new_event( + origin, + event, + state=state, + backfilled=backfilled, + current_state=current_state, + ) + except AuthError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=event.event_id, + ) # if we're receiving valid events from an origin, # it's probably a good idea to mark it as not in retry-state @@ -649,35 +661,8 @@ class FederationHandler(BaseHandler): # FIXME pass - ev_infos = [] - for e in itertools.chain(state, auth_chain): - if e.event_id == event.event_id: - continue - - e.internal_metadata.outlier = True - auth_ids = [e_id for e_id, _ in e.auth_events] - ev_infos.append({ - "event": e, - "auth_events": { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - }) - - yield self._handle_new_events(origin, ev_infos, outliers=True) - - auth_ids = [e_id for e_id, _ in event.auth_events] - auth_events = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - - _, event_stream_id, max_stream_id = yield self._handle_new_event( - origin, - new_event, - state=state, - current_state=state, - auth_events=auth_events, + event_stream_id, max_stream_id = yield self._persist_auth_tree( + auth_chain, state, event ) with PreserveLoggingContext(): @@ -1027,6 +1012,76 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks + def _persist_auth_tree(self, auth_events, state, event): + """Checks the auth chain is valid (and passes auth checks) for the + state and event. Then persists the auth chain and state atomically. + Persists the event seperately. + + Returns: + 2-tuple of (event_stream_id, max_stream_id) from the persist_event + call for `event` + """ + events_to_context = {} + for e in itertools.chain(auth_events, state): + ctx = yield self.state_handler.compute_event_context( + e, outlier=True, + ) + events_to_context[e.event_id] = ctx + e.internal_metadata.outlier = True + + event_map = { + e.event_id: e + for e in auth_events + } + + create_event = None + for e in auth_events: + if (e.type, e.state_key) == (EventTypes.Create, ""): + create_event = e + break + + for e in itertools.chain(auth_events, state, [event]): + auth_for_e = { + (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] + for e_id, _ in e.auth_events + } + if create_event: + auth_for_e[(EventTypes.Create, "")] = create_event + + try: + self.auth.check(e, auth_events=auth_for_e) + except AuthError as err: + logger.warn( + "Rejecting %s because %s", + e.event_id, err.msg + ) + + if e == event: + raise + events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR + + yield self.store.persist_events( + [ + (e, events_to_context[e.event_id]) + for e in itertools.chain(auth_events, state) + ], + is_new_state=False, + ) + + new_event_context = yield self.state_handler.compute_event_context( + event, old_state=state, outlier=False, + ) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, new_event_context, + backfilled=False, + is_new_state=True, + current_state=state, + ) + + defer.returnValue((event_stream_id, max_stream_id)) + + @defer.inlineCallbacks def _prep_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() |