diff options
author | Erik Johnston <erik@matrix.org> | 2015-06-25 17:18:19 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-06-25 17:29:34 +0100 |
commit | 5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8 (patch) | |
tree | e16d302641072f6a700d3d65e919c7cb01447e47 /synapse/handlers/federation.py | |
parent | Batch SELECTs in _get_auth_chain_ids_txn (diff) | |
download | synapse-5130d80d79fe1f95ce03b8f1cfd4fbf0a32f5ac8.tar.xz |
Add bulk insert events API
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 227 |
1 files changed, 121 insertions, 106 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b5d882fd65..079f46dffd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -138,26 +138,29 @@ class FederationHandler(BaseHandler): if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication # layer, then we should handle them (if we haven't before.) + + event_infos = [] + for e in itertools.chain(auth_chain, state): if e.event_id in seen_ids: continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - seen_ids.add(e.event_id) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append({ + "event": e, + "auth_events": auth, + }) + seen_ids.add(e.event_id) + + yield self._handle_new_events( + origin, + event_infos, + outliers=True + ) try: _, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -292,38 +295,29 @@ class FederationHandler(BaseHandler): ).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) - yield defer.gatherResults( - [ - self._handle_new_event( - dest, a, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in a.auth_events - }, - ) - for a in auth_events.values() - if a.event_id not in seen_events - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) - - yield defer.gatherResults( - [ - self._handle_new_event( - dest, event_map[e_id], - state=events_to_state[e_id], - backfilled=True, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events - }, - ) - for e_id in events_to_state - ], - consumeErrors=True - ).addErrback(unwrapFirstError) + ev_infos = [] + for a in auth_events.values(): + if a.event_id in seen_events: + continue + ev_infos.append({ + "event": a, + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + } + }) + + for e_id in events_to_state: + ev_infos.append({ + "event": event_map[e_id], + "state": events_to_state[e_id], + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + } + }) events.sort(key=lambda e: e.depth) @@ -331,10 +325,14 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - yield self._handle_new_event( - dest, event, - backfilled=True, - ) + ev_infos.append({ + "event": event, + }) + + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) defer.returnValue(events) @@ -600,32 +598,22 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._handle_auth_events( - origin, [e for e in auth_chain if e.event_id != event.event_id] - ) - - @defer.inlineCallbacks - def handle_state(e): + ev_infos = [] + for e in itertools.chain(state, auth_chain): if e.event_id == event.event_id: - return + continue e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { + auth_ids = [e_id for e_id, _ in e.auth_events] + ev_infos.append({ + "event": e, + "auth_events": { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + }) - yield defer.DeferredList([handle_state(e) for e in state]) + yield self._handle_new_events(origin, ev_infos, outliers=True) auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { @@ -940,11 +928,54 @@ class FederationHandler(BaseHandler): def _handle_new_event(self, origin, event, state=None, backfilled=False, current_state=None, auth_events=None): - logger.debug( - "_handle_new_event: %s, sigs: %s", - event.event_id, event.signatures, + outlier = event.internal_metadata.is_outlier() + + context = yield self._prep_event( + origin, event, + state=state, + backfilled=backfilled, + current_state=current_state, + auth_events=auth_events, ) + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=(not outlier and not backfilled), + current_state=current_state, + ) + + defer.returnValue((context, event_stream_id, max_stream_id)) + + @defer.inlineCallbacks + def _handle_new_events(self, origin, event_infos, backfilled=False, + outliers=False): + contexts = yield defer.gatherResults( + [ + self._prep_event( + origin, + ev_info["event"], + state=ev_info.get("state"), + backfilled=backfilled, + auth_events=ev_info.get("auth_events"), + ) + for ev_info in event_infos + ] + ) + + yield self.store.persist_events( + [ + (ev_info["event"], context) + for ev_info, context in itertools.izip(event_infos, contexts) + ], + backfilled=backfilled, + is_new_state=(not outliers and not backfilled), + ) + + @defer.inlineCallbacks + def _prep_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): outlier = event.internal_metadata.is_outlier() context = yield self.state_handler.compute_event_context( @@ -954,13 +985,6 @@ class FederationHandler(BaseHandler): if not auth_events: auth_events = context.current_state - logger.debug( - "_handle_new_event: %s, auth_events: %s", - event.event_id, auth_events, - ) - - is_new_state = not outlier - # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: @@ -984,26 +1008,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - # FIXME: Don't store as rejected with AUTH_ERROR if we haven't - # seen all the auth events. - yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=False, - current_state=current_state, - ) - raise - - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - is_new_state=(is_new_state and not backfilled), - current_state=current_state, - ) - - defer.returnValue((context, event_stream_id, max_stream_id)) + defer.returnValue(context) @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, @@ -1066,14 +1071,24 @@ class FederationHandler(BaseHandler): @log_function def do_auth(self, origin, event, context, auth_events): # Check if we have all the auth events. - have_events = yield self.store.have_events( - [e_id for e_id, _ in event.auth_events] - ) - + current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) + + if event_auth_events - current_state: + have_events = yield self.store.have_events( + event_auth_events - current_state + ) + else: + have_events = {} + + have_events.update({ + e.event_id: "" + for e in auth_events.values() + }) + seen_events = set(have_events.keys()) - missing_auth = event_auth_events - seen_events + missing_auth = event_auth_events - seen_events - current_state if missing_auth: logger.info("Missing auth: %s", missing_auth) |