diff options
author | Erik Johnston <erik@matrix.org> | 2015-06-22 11:42:10 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-06-22 11:42:10 +0100 |
commit | 96be533f1f7dde8a0eb62d26c05f50f5e517b90a (patch) | |
tree | df62a82eab17e00aa464ba6c023b7d6bfb86da5e | |
parent | Properly cache get_server_verify_keys (diff) | |
download | synapse-96be533f1f7dde8a0eb62d26c05f50f5e517b90a.tar.xz |
Use new store.persist_events function in federation handler
-rw-r--r-- | synapse/handlers/federation.py | 189 | ||||
-rw-r--r-- | synapse/storage/events.py | 11 |
2 files changed, 130 insertions, 70 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c8baf90b4a..214f02a188 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -138,26 +138,25 @@ class FederationHandler(BaseHandler): if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication # layer, then we should handle them (if we haven't before.) + + event_infos = [] + for e in itertools.chain(auth_chain, state): if e.event_id in seen_ids: continue - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - seen_ids.add(e.event_id) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + event_infos.append[{ + "event": e, + "auth_events": auth, + }] + seen_ids.add(e.event_id) + + yield self._handle_new_events(origin, event_infos) try: _, event_stream_id, max_stream_id = yield self._handle_new_event( @@ -292,38 +291,29 @@ class FederationHandler(BaseHandler): ).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results}) - yield defer.gatherResults( - [ - self._handle_new_event( - dest, a, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in a.auth_events - }, - ) - for a in auth_events.values() - if a.event_id not in seen_events - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) - - yield defer.gatherResults( - [ - self._handle_new_event( - dest, event_map[e_id], - state=events_to_state[e_id], - backfilled=True, - auth_events={ - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id, _ in event_map[e_id].auth_events - }, - ) - for e_id in events_to_state - ], - consumeErrors=True - ).addErrback(unwrapFirstError) + ev_infos = [] + for a in auth_events.values(): + if a.event_id in seen_events: + continue + ev_infos.append({ + "event": a, + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + } + }) + + for e_id in events_to_state: + ev_infos.append({ + "event": event_map[e_id], + "state": events_to_state[e_id], + "auth_events": { + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + } + }) events.sort(key=lambda e: e.depth) @@ -331,10 +321,14 @@ class FederationHandler(BaseHandler): if event in events_to_state: continue - yield self._handle_new_event( - dest, event, - backfilled=True, - ) + ev_infos.append({ + "event": event, + }) + + yield self._handle_new_events( + dest, ev_infos, + backfilled=True, + ) defer.returnValue(events) @@ -600,32 +594,26 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._handle_auth_events( - origin, [e for e in auth_chain if e.event_id != event.event_id] - ) + # yield self._handle_auth_events( + # origin, [e for e in auth_chain if e.event_id != event.event_id] + # ) - @defer.inlineCallbacks - def handle_state(e): + ev_infos = [] + for e in itertools.chain(state, auth_chain): if e.event_id == event.event_id: - return + continue e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { + auth_ids = [e_id for e_id, _ in e.auth_events] + ev_infos.append({ + "event": e, + "auth_events": { (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + }) - yield defer.DeferredList([handle_state(e) for e in state]) + yield self._handle_new_events(origin, ev_infos) auth_ids = [e_id for e_id, _ in event.auth_events] auth_events = { @@ -1006,6 +994,67 @@ class FederationHandler(BaseHandler): defer.returnValue((context, event_stream_id, max_stream_id)) @defer.inlineCallbacks + def _handle_new_events(self, origin, event_infos, backfilled=False): + contexts = yield defer.gatherResults( + [ + self._prep_event( + origin, + ev_info["event"], + state=ev_info.get("state"), + backfilled=backfilled, + auth_events=ev_info.get("auth_events"), + ) + for ev_info in event_infos + ] + ) + + yield self.store.persist_events( + [ + (ev_info["event"], context) + for ev_info, context in itertools.izip(event_infos, contexts) + ], + backfilled=backfilled, + is_new_state=(not backfilled), + ) + + @defer.inlineCallbacks + def _prep_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): + outlier = event.internal_metadata.is_outlier() + + context = yield self.state_handler.compute_event_context( + event, old_state=state, outlier=outlier, + ) + + if not auth_events: + auth_events = context.current_state + + # This is a hack to fix some old rooms where the initial join event + # didn't reference the create event in its auth events. + if event.type == EventTypes.Member and not event.auth_events: + if len(event.prev_events) == 1 and event.depth < 5: + c = yield self.store.get_event( + event.prev_events[0][0], + allow_none=True, + ) + if c and c.type == EventTypes.Create: + auth_events[(c.type, c.state_key)] = c + + try: + yield self.do_auth( + origin, event, context, auth_events=auth_events + ) + except AuthError as e: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) + + context.rejected = RejectedReason.AUTH_ERROR + + defer.returnValue(context) + + @defer.inlineCallbacks def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, missing): # Just go through and process each event in `remote_auth_chain`. We diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5fe11cf3fb..e02a8066d6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -45,6 +45,17 @@ EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events class EventsStore(SQLBaseStore): + def persist_events(self, events_and_contexts, backfilled=False, + is_new_state=True): + return defer.gatherResults([ + self.persist_event( + event, context, + backfilled=backfilled, + is_new_state=is_new_state, + ) + for event, context in events_and_contexts + ]) + @defer.inlineCallbacks @log_function def persist_event(self, event, context, backfilled=False, |