diff options
author | Amber Brown <hawkowl@atleastfornow.net> | 2019-06-20 19:32:02 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-20 19:32:02 +1000 |
commit | 32e7c9e7f20b57dd081023ac42d6931a8da9b3a3 (patch) | |
tree | 139ef30c957535699d1ae0474e8b5ba2517196b2 /synapse/handlers/federation.py | |
parent | Merge pull request #5042 from matrix-org/erikj/fix_get_missing_events_error (diff) | |
download | synapse-32e7c9e7f20b57dd081023ac42d6931a8da9b3a3.tar.xz |
Run Black. (#5482)
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 884 |
1 files changed, 363 insertions, 521 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d5a605d3bd..02d397c498 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -82,7 +82,7 @@ def shortstr(iterable, maxitems=5): items = list(itertools.islice(iterable, maxitems + 1)) if len(items) <= maxitems: return str(items) - return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]" + return "[" + ", ".join(repr(r) for r in items[:maxitems]) + ", ...]" class FederationHandler(BaseHandler): @@ -115,14 +115,14 @@ class FederationHandler(BaseHandler): self.config = hs.config self.http_client = hs.get_simple_http_client() - self._send_events_to_master = ( - ReplicationFederationSendEventsRestServlet.make_client(hs) + self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client( + hs ) - self._notify_user_membership_change = ( - ReplicationUserJoinedLeftRoomRestServlet.make_client(hs) + self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client( + hs ) - self._clean_room_for_join_client = ( - ReplicationCleanRoomRestServlet.make_client(hs) + self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( + hs ) # When joining a room we need to queue any events for that room up @@ -132,9 +132,7 @@ class FederationHandler(BaseHandler): self.third_party_event_rules = hs.get_third_party_event_rules() @defer.inlineCallbacks - def on_receive_pdu( - self, origin, pdu, sent_to_us_directly=False, - ): + def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -151,26 +149,19 @@ class FederationHandler(BaseHandler): room_id = pdu.room_id event_id = pdu.event_id - logger.info( - "[%s %s] handling received PDU: %s", - room_id, event_id, pdu, - ) + logger.info("[%s %s] handling received PDU: %s", room_id, event_id, pdu) # We reprocess pdus when we have seen them only as outliers existing = yield self.store.get_event( - event_id, - allow_none=True, - allow_rejected=True, + event_id, allow_none=True, allow_rejected=True ) # FIXME: Currently we fetch an event again when we already have it # if it has been marked as an outlier. - already_seen = ( - existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) + already_seen = existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() ) if already_seen: logger.debug("[%s %s]: Already seen pdu", room_id, event_id) @@ -182,20 +173,19 @@ class FederationHandler(BaseHandler): try: self._sanity_check_event(pdu) except SynapseError as err: - logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id) - raise FederationError( - "ERROR", - err.code, - err.msg, - affected=pdu.event_id, + logger.warn( + "[%s %s] Received event failed sanity checks", room_id, event_id ) + raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id) # If we are currently in the process of joining this room, then we # queue up events for later processing. if room_id in self.room_queues: logger.info( "[%s %s] Queuing PDU from %s for now: join in progress", - room_id, event_id, origin, + room_id, + event_id, + origin, ) self.room_queues[room_id].append((pdu, origin)) return @@ -206,14 +196,13 @@ class FederationHandler(BaseHandler): # # Note that if we were never in the room then we would have already # dropped the event, since we wouldn't know the room version. - is_in_room = yield self.auth.check_host_in_room( - room_id, - self.server_name - ) + is_in_room = yield self.auth.check_host_in_room(room_id, self.server_name) if not is_in_room: logger.info( "[%s %s] Ignoring PDU from %s as we're not in the room", - room_id, event_id, origin, + room_id, + event_id, + origin, ) defer.returnValue(None) @@ -223,14 +212,9 @@ class FederationHandler(BaseHandler): # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. - min_depth = yield self.get_min_depth_for_context( - pdu.room_id - ) + min_depth = yield self.get_min_depth_for_context(pdu.room_id) - logger.debug( - "[%s %s] min_depth: %d", - room_id, event_id, min_depth, - ) + logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth) prevs = set(pdu.prev_event_ids()) seen = yield self.store.have_seen_events(prevs) @@ -248,12 +232,17 @@ class FederationHandler(BaseHandler): # at a time. logger.info( "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", - room_id, event_id, len(missing_prevs), shortstr(missing_prevs), + room_id, + event_id, + len(missing_prevs), + shortstr(missing_prevs), ) with (yield self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( "[%s %s] Acquired room lock to fetch %d missing prev_events", - room_id, event_id, len(missing_prevs), + room_id, + event_id, + len(missing_prevs), ) yield self._get_missing_events_for_pdu( @@ -267,12 +256,16 @@ class FederationHandler(BaseHandler): if not prevs - seen: logger.info( "[%s %s] Found all missing prev_events", - room_id, event_id, + room_id, + event_id, ) elif missing_prevs: logger.info( "[%s %s] Not recursively fetching %d missing prev_events: %s", - room_id, event_id, len(missing_prevs), shortstr(missing_prevs), + room_id, + event_id, + len(missing_prevs), + shortstr(missing_prevs), ) if prevs - seen: @@ -303,7 +296,10 @@ class FederationHandler(BaseHandler): if sent_to_us_directly: logger.warn( "[%s %s] Rejecting: failed to fetch %d prev events: %s", - room_id, event_id, len(prevs - seen), shortstr(prevs - seen) + room_id, + event_id, + len(prevs - seen), + shortstr(prevs - seen), ) raise FederationError( "ERROR", @@ -318,9 +314,7 @@ class FederationHandler(BaseHandler): # Calculate the state after each of the previous events, and # resolve them to find the correct state at the current event. auth_chains = set() - event_map = { - event_id: pdu, - } + event_map = {event_id: pdu} try: # Get the state of the events we know about ours = yield self.store.get_state_groups_ids(room_id, seen) @@ -337,7 +331,9 @@ class FederationHandler(BaseHandler): for p in prevs - seen: logger.info( "[%s %s] Requesting state at missing prev_event %s", - room_id, event_id, p, + room_id, + event_id, + p, ) room_version = yield self.store.get_room_version(room_id) @@ -348,19 +344,19 @@ class FederationHandler(BaseHandler): # by the get_pdu_cache in federation_client. remote_state, got_auth_chain = ( yield self.federation_client.get_state_for_room( - origin, room_id, p, + origin, room_id, p ) ) # we want the state *after* p; get_state_for_room returns the # state *before* p. remote_event = yield self.federation_client.get_pdu( - [origin], p, room_version, outlier=True, + [origin], p, room_version, outlier=True ) if remote_event is None: raise Exception( - "Unable to get missing prev_event %s" % (p, ) + "Unable to get missing prev_event %s" % (p,) ) if remote_event.is_state(): @@ -380,7 +376,9 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x state_map = yield resolve_events_with_store( - room_version, state_maps, event_map, + room_version, + state_maps, + event_map, state_res_store=StateResolutionStore(self.store), ) @@ -396,15 +394,15 @@ class FederationHandler(BaseHandler): ) event_map.update(evs) - state = [ - event_map[e] for e in six.itervalues(state_map) - ] + state = [event_map[e] for e in six.itervalues(state_map)] auth_chain = list(auth_chains) except Exception: logger.warn( "[%s %s] Error attempting to resolve state at missing " "prev_events", - room_id, event_id, exc_info=True, + room_id, + event_id, + exc_info=True, ) raise FederationError( "ERROR", @@ -414,10 +412,7 @@ class FederationHandler(BaseHandler): ) yield self._process_received_pdu( - origin, - pdu, - state=state, - auth_chain=auth_chain, + origin, pdu, state=state, auth_chain=auth_chain ) @defer.inlineCallbacks @@ -447,7 +442,10 @@ class FederationHandler(BaseHandler): logger.info( "[%s %s]: Requesting missing events between %s and %s", - room_id, event_id, shortstr(latest), event_id, + room_id, + event_id, + shortstr(latest), + event_id, ) # XXX: we set timeout to 10s to help workaround @@ -512,15 +510,15 @@ class FederationHandler(BaseHandler): # We failed to get the missing events, but since we need to handle # the case of `get_missing_events` not returning the necessary # events anyway, it is safe to simply log the error and continue. - logger.warn( - "[%s %s]: Failed to get prev_events: %s", - room_id, event_id, e, - ) + logger.warn("[%s %s]: Failed to get prev_events: %s", room_id, event_id, e) return logger.info( "[%s %s]: Got %d prev_events: %s", - room_id, event_id, len(missing_events), shortstr(missing_events), + room_id, + event_id, + len(missing_events), + shortstr(missing_events), ) # We want to sort these by depth so we process them and @@ -530,20 +528,20 @@ class FederationHandler(BaseHandler): for ev in missing_events: logger.info( "[%s %s] Handling received prev_event %s", - room_id, event_id, ev.event_id, + room_id, + event_id, + ev.event_id, ) with logcontext.nested_logging_context(ev.event_id): try: - yield self.on_receive_pdu( - origin, - ev, - sent_to_us_directly=False, - ) + yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False) except FederationError as e: if e.code == 403: logger.warn( "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + room_id, + event_id, + ev.event_id, ) else: raise @@ -556,10 +554,7 @@ class FederationHandler(BaseHandler): room_id = event.room_id event_id = event.event_id - logger.debug( - "[%s %s] Processing event: %s", - room_id, event_id, event, - ) + logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) event_ids = set() if state: @@ -581,43 +576,32 @@ class FederationHandler(BaseHandler): e.internal_metadata.outlier = True auth_ids = e.auth_event_ids() auth = { - (e.type, e.state_key): e for e in auth_chain + (e.type, e.state_key): e + for e in auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create } - event_infos.append({ - "event": e, - "auth_events": auth, - }) + event_infos.append({"event": e, "auth_events": auth}) seen_ids.add(e.event_id) logger.info( "[%s %s] persisting newly-received auth/state events %s", - room_id, event_id, [e["event"].event_id for e in event_infos] + room_id, + event_id, + [e["event"].event_id for e in event_infos], ) yield self._handle_new_events(origin, event_infos) try: - context = yield self._handle_new_event( - origin, - event, - state=state, - ) + context = yield self._handle_new_event(origin, event, state=state) except AuthError as e: - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) + raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) room = yield self.store.get_room(room_id) if not room: try: yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False, + room_id=room_id, room_creator_user_id="", is_public=False ) except StoreError: logger.exception("Failed to store room.") @@ -631,12 +615,10 @@ class FederationHandler(BaseHandler): prev_state_ids = yield context.get_prev_state_ids(self.store) - prev_state_id = prev_state_ids.get( - (event.type, event.state_key) - ) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) if prev_state_id: prev_state = yield self.store.get_event( - prev_state_id, allow_none=True, + prev_state_id, allow_none=True ) if prev_state and prev_state.membership == Membership.JOIN: newly_joined = False @@ -667,10 +649,7 @@ class FederationHandler(BaseHandler): room_version = yield self.store.get_room_version(room_id) events = yield self.federation_client.backfill( - dest, - room_id, - limit=limit, - extremities=extremities, + dest, room_id, limit=limit, extremities=extremities ) # ideally we'd sanity check the events here for excess prev_events etc, @@ -697,16 +676,9 @@ class FederationHandler(BaseHandler): event_ids = set(e.event_id for e in events) - edges = [ - ev.event_id - for ev in events - if set(ev.prev_event_ids()) - event_ids - ] + edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids] - logger.info( - "backfill: Got %d events with %d edges", - len(events), len(edges), - ) + logger.info("backfill: Got %d events with %d edges", len(events), len(edges)) # For each edge get the current state. @@ -715,9 +687,7 @@ class FederationHandler(BaseHandler): events_to_state = {} for e_id in edges: state, auth = yield self.federation_client.get_state_for_room( - destination=dest, - room_id=room_id, - event_id=e_id + destination=dest, room_id=room_id, event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) auth_events.update({s.event_id: s for s in state}) @@ -726,12 +696,14 @@ class FederationHandler(BaseHandler): required_auth = set( a_id - for event in events + list(state_events.values()) + list(auth_events.values()) + for event in events + + list(state_events.values()) + + list(auth_events.values()) for a_id in event.auth_event_ids() ) - auth_events.update({ - e_id: event_map[e_id] for e_id in required_auth if e_id in event_map - }) + auth_events.update( + {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} + ) missing_auth = required_auth - set(auth_events) failed_to_fetch = set() @@ -750,27 +722,30 @@ class FederationHandler(BaseHandler): if missing_auth - failed_to_fetch: logger.info( "Fetching missing auth for backfill: %r", - missing_auth - failed_to_fetch + missing_auth - failed_to_fetch, ) - results = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self.federation_client.get_pdu, - [dest], - event_id, - room_version=room_version, - outlier=True, - timeout=10000, - ) - for event_id in missing_auth - failed_to_fetch - ], - consumeErrors=True - )).addErrback(unwrapFirstError) + results = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + logcontext.run_in_background( + self.federation_client.get_pdu, + [dest], + event_id, + room_version=room_version, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth - failed_to_fetch + ], + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) auth_events.update({a.event_id: a for a in results if a}) required_auth.update( a_id - for event in results if event + for event in results + if event for a_id in event.auth_event_ids() ) missing_auth = required_auth - set(auth_events) @@ -802,15 +777,19 @@ class FederationHandler(BaseHandler): continue a.internal_metadata.outlier = True - ev_infos.append({ - "event": a, - "auth_events": { - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id in a.auth_event_ids() - if a_id in auth_events + ev_infos.append( + { + "event": a, + "auth_events": { + ( + auth_events[a_id].type, + auth_events[a_id].state_key, + ): auth_events[a_id] + for a_id in a.auth_event_ids() + if a_id in auth_events + }, } - }) + ) # Step 1b: persist the events in the chunk we fetched state for (i.e. # the backwards extremities) as non-outliers. @@ -818,23 +797,24 @@ class FederationHandler(BaseHandler): # For paranoia we ensure that these events are marked as # non-outliers ev = event_map[e_id] - assert(not ev.internal_metadata.is_outlier()) - - ev_infos.append({ - "event": ev, - "state": events_to_state[e_id], - "auth_events": { - (auth_events[a_id].type, auth_events[a_id].state_key): - auth_events[a_id] - for a_id in ev.auth_event_ids() - if a_id in auth_events + assert not ev.internal_metadata.is_outlier() + + ev_infos.append( + { + "event": ev, + "state": events_to_state[e_id], + "auth_events": { + ( + auth_events[a_id].type, + auth_events[a_id].state_key, + ): auth_events[a_id] + for a_id in ev.auth_event_ids() + if a_id in auth_events + }, } - }) + ) - yield self._handle_new_events( - dest, ev_infos, - backfilled=True, - ) + yield self._handle_new_events(dest, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -845,14 +825,12 @@ class FederationHandler(BaseHandler): # For paranoia we ensure that these events are marked as # non-outliers - assert(not event.internal_metadata.is_outlier()) + assert not event.internal_metadata.is_outlier() # We store these one at a time since each event depends on the # previous to work out the state. # TODO: We can probably do something more clever here. - yield self._handle_new_event( - dest, event, backfilled=True, - ) + yield self._handle_new_event(dest, event, backfilled=True) defer.returnValue(events) @@ -861,9 +839,7 @@ class FederationHandler(BaseHandler): """Checks the database to see if we should backfill before paginating, and if so do. """ - extremities = yield self.store.get_oldest_events_with_depth_in_room( - room_id - ) + extremities = yield self.store.get_oldest_events_with_depth_in_room(room_id) if not extremities: logger.debug("Not backfilling as no extremeties found.") @@ -895,31 +871,27 @@ class FederationHandler(BaseHandler): # state *before* the event, ignoring the special casing certain event # types have. - forward_events = yield self.store.get_successor_events( - list(extremities), - ) + forward_events = yield self.store.get_successor_events(list(extremities)) extremities_events = yield self.store.get_events( - forward_events, - check_redacted=False, - get_prev_content=False, + forward_events, check_redacted=False, get_prev_content=False ) # We set `check_history_visibility_only` as we might otherwise get false # positives from users having been erased. filtered_extremities = yield filter_events_for_server( - self.store, self.server_name, list(extremities_events.values()), - redact=False, check_history_visibility_only=True, + self.store, + self.server_name, + list(extremities_events.values()), + redact=False, + check_history_visibility_only=True, ) if not filtered_extremities: defer.returnValue(False) # Check if we reached a point where we should start backfilling. - sorted_extremeties_tuple = sorted( - extremities.items(), - key=lambda e: -int(e[1]) - ) + sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1])) max_depth = sorted_extremeties_tuple[0][1] # We don't want to specify too many extremities as it causes the backfill @@ -928,8 +900,7 @@ class FederationHandler(BaseHandler): if current_depth > max_depth: logger.debug( - "Not backfilling as we don't need to. %d < %d", - max_depth, current_depth, + "Not backfilling as we don't need to. %d < %d", max_depth, current_depth ) return @@ -954,8 +925,7 @@ class FederationHandler(BaseHandler): joined_users = [ (state_key, int(event.depth)) for (e_type, state_key), event in iteritems(state) - if e_type == EventTypes.Member - and event.membership == Membership.JOIN + if e_type == EventTypes.Member and event.membership == Membership.JOIN ] joined_domains = {} @@ -975,8 +945,7 @@ class FederationHandler(BaseHandler): curr_domains = get_domains_from_state(curr_state) likely_domains = [ - domain for domain, depth in curr_domains - if domain != self.server_name + domain for domain, depth in curr_domains if domain != self.server_name ] @defer.inlineCallbacks @@ -985,28 +954,20 @@ class FederationHandler(BaseHandler): for dom in domains: try: yield self.backfill( - dom, room_id, - limit=100, - extremities=extremities, + dom, room_id, limit=100, extremities=extremities ) # If this succeeded then we probably already have the # appropriate stuff. # TODO: We can probably do something more intelligent here. defer.returnValue(True) except SynapseError as e: - logger.info( - "Failed to backfill from %s because %s", - dom, e, - ) + logger.info("Failed to backfill from %s because %s", dom, e) continue except CodeMessageException as e: if 400 <= e.code < 500: raise - logger.info( - "Failed to backfill from %s because %s", - dom, e, - ) + logger.info("Failed to backfill from %s because %s", dom, e) continue except NotRetryingDestination as e: logger.info(str(e)) @@ -1015,10 +976,7 @@ class FederationHandler(BaseHandler): logger.info(e) continue except Exception as e: - logger.exception( - "Failed to backfill from %s because %s", - dom, e, - ) + logger.exception("Failed to backfill from %s because %s", dom, e) continue defer.returnValue(False) @@ -1039,10 +997,11 @@ class FederationHandler(BaseHandler): resolve = logcontext.preserve_fn( self.state_handler.resolve_state_groups_for_events ) - states = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [resolve(room_id, [e]) for e in event_ids], - consumeErrors=True, - )) + states = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [resolve(room_id, [e]) for e in event_ids], consumeErrors=True + ) + ) # dict[str, dict[tuple, str]], a map from event_id to state map of # event_ids. @@ -1050,23 +1009,23 @@ class FederationHandler(BaseHandler): state_map = yield self.store.get_events( [e_id for ids in itervalues(states) for e_id in itervalues(ids)], - get_prev_content=False + get_prev_content=False, ) states = { key: { k: state_map[e_id] for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in iteritems(states) + } + for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) - success = yield try_backfill([ - dom for dom, _ in likely_domains - if dom not in tried_domains - ]) + success = yield try_backfill( + [dom for dom, _ in likely_domains if dom not in tried_domains] + ) if success: defer.returnValue(True) @@ -1091,20 +1050,20 @@ class FederationHandler(BaseHandler): SynapseError if the event does not pass muster """ if len(ev.prev_event_ids()) > 20: - logger.warn("Rejecting event %s which has %i prev_events", - ev.event_id, len(ev.prev_event_ids())) - raise SynapseError( - http_client.BAD_REQUEST, - "Too many prev_events", + logger.warn( + "Rejecting event %s which has %i prev_events", + ev.event_id, + len(ev.prev_event_ids()), ) + raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events") if len(ev.auth_event_ids()) > 10: - logger.warn("Rejecting event %s which has %i auth_events", - ev.event_id, len(ev.auth_event_ids())) - raise SynapseError( - http_client.BAD_REQUEST, - "Too many auth_events", + logger.warn( + "Rejecting event %s which has %i auth_events", + ev.event_id, + len(ev.auth_event_ids()), ) + raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events") @defer.inlineCallbacks def send_invite(self, target_host, event): @@ -1116,7 +1075,7 @@ class FederationHandler(BaseHandler): destination=target_host, room_id=event.room_id, event_id=event.event_id, - pdu=event + pdu=event, ) defer.returnValue(pdu) @@ -1125,8 +1084,7 @@ class FederationHandler(BaseHandler): def on_event_auth(self, event_id): event = yield self.store.get_event(event_id) auth = yield self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], - include_given=True + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) defer.returnValue([e for e in auth]) @@ -1152,15 +1110,13 @@ class FederationHandler(BaseHandler): joinee, "join", content, - params={ - "ver": KNOWN_ROOM_VERSIONS, - }, + params={"ver": KNOWN_ROOM_VERSIONS}, ) # This shouldn't happen, because the RoomMemberHandler has a # linearizer lock which only allows one operation per user per room # at a time - so this is just paranoia. - assert (room_id not in self.room_queues) + assert room_id not in self.room_queues self.room_queues[room_id] = [] @@ -1177,7 +1133,7 @@ class FederationHandler(BaseHandler): except ValueError: pass ret = yield self.federation_client.send_join( - target_hosts, event, event_format_version, + target_hosts, event, event_format_version ) origin = ret["origin"] @@ -1196,17 +1152,13 @@ class FederationHandler(BaseHandler): try: yield self.store.store_room( - room_id=room_id, - room_creator_user_id="", - is_public=False + room_id=room_id, room_creator_user_id="", is_public=False ) except Exception: # FIXME pass - yield self._persist_auth_tree( - origin, auth_chain, state, event - ) + yield self._persist_auth_tree(origin, auth_chain, state, event) logger.debug("Finished joining %s to %s", joinee, room_id) finally: @@ -1233,14 +1185,18 @@ class FederationHandler(BaseHandler): """ for p, origin in room_queue: try: - logger.info("Processing queued PDU %s which was received " - "while we were joining %s", p.event_id, p.room_id) + logger.info( + "Processing queued PDU %s which was received " + "while we were joining %s", + p.event_id, + p.room_id, + ) with logcontext.nested_logging_context(p.event_id): yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( - "Error handling queued PDU %s from %s: %s", - p.event_id, origin, e) + "Error handling queued PDU %s from %s: %s", p.event_id, origin, e + ) @defer.inlineCallbacks @log_function @@ -1261,30 +1217,30 @@ class FederationHandler(BaseHandler): "room_id": room_id, "sender": user_id, "state_key": user_id, - } + }, ) try: event, context = yield self.event_creation_handler.create_new_client_event( - builder=builder, + builder=builder ) except AuthError as e: logger.warn("Failed to create join %r because %s", event, e) raise e event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.info("Creation of join %s forbidden by third-party rules", event) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` yield self.auth.check_from_context( - room_version, event, context, do_sig_check=False, + room_version, event, context, do_sig_check=False ) defer.returnValue(event) @@ -1319,17 +1275,15 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context = yield self._handle_new_event( - origin, event - ) + context = yield self._handle_new_event(origin, event) event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.info("Sending of join %s forbidden by third-party rules", event) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) logger.debug( @@ -1350,10 +1304,7 @@ class FederationHandler(BaseHandler): state = yield self.store.get_events(list(prev_state_ids.values())) - defer.returnValue({ - "state": list(state.values()), - "auth_chain": auth_chain, - }) + defer.returnValue({"state": list(state.values()), "auth_chain": auth_chain}) @defer.inlineCallbacks def on_invite_request(self, origin, pdu): @@ -1374,7 +1325,7 @@ class FederationHandler(BaseHandler): raise SynapseError(403, "This server does not accept room invites") if not self.spam_checker.user_may_invite( - event.sender, event.state_key, event.room_id, + event.sender, event.state_key, event.room_id ): raise SynapseError( 403, "This user is not permitted to send invites to this server/user" @@ -1386,26 +1337,23 @@ class FederationHandler(BaseHandler): sender_domain = get_domain_from_id(event.sender) if sender_domain != origin: - raise SynapseError(400, "The invite event was not from the server sending it") + raise SynapseError( + 400, "The invite event was not from the server sending it" + ) if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") # block any attempts to invite the server notices mxid if event.state_key == self._server_notices_mxid: - raise SynapseError( - http_client.FORBIDDEN, - "Cannot invite this user", - ) + raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user") event.internal_metadata.outlier = True event.internal_metadata.out_of_band_membership = True event.signatures.update( compute_event_signature( - event.get_pdu_json(), - self.hs.hostname, - self.hs.config.signing_key[0] + event.get_pdu_json(), self.hs.hostname, self.hs.config.signing_key[0] ) ) @@ -1417,10 +1365,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): origin, event, event_format_version = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" + target_hosts, room_id, user_id, "leave" ) # Mark as outlier as we don't have any state for this event; we're not # even in the room. @@ -1435,10 +1380,7 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.federation_client.send_leave( - target_hosts, - event - ) + yield self.federation_client.send_leave(target_hosts, event) context = yield self.state_handler.compute_event_context(event) yield self.persist_events_and_notify([(event, context)]) @@ -1446,25 +1388,21 @@ class FederationHandler(BaseHandler): defer.returnValue(event) @defer.inlineCallbacks - def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, - content={}, params=None): + def _make_and_verify_event( + self, target_hosts, room_id, user_id, membership, content={}, params=None + ): origin, event, format_ver = yield self.federation_client.make_membership_event( - target_hosts, - room_id, - user_id, - membership, - content, - params=params, + target_hosts, room_id, user_id, membership, content, params=params ) logger.debug("Got response to make_%s: %s", membership, event) # We should assert some things. # FIXME: Do this in a nicer way - assert(event.type == EventTypes.Member) - assert(event.user_id == user_id) - assert(event.state_key == user_id) - assert(event.room_id == room_id) + assert event.type == EventTypes.Member + assert event.user_id == user_id + assert event.state_key == user_id + assert event.room_id == room_id defer.returnValue((origin, event, format_ver)) @defer.inlineCallbacks @@ -1483,27 +1421,27 @@ class FederationHandler(BaseHandler): "room_id": room_id, "sender": user_id, "state_key": user_id, - } + }, ) event, context = yield self.event_creation_handler.create_new_client_event( - builder=builder, + builder=builder ) event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.warning("Creation of leave %s forbidden by third-party rules", event) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_leave_request` yield self.auth.check_from_context( - room_version, event, context, do_sig_check=False, + room_version, event, context, do_sig_check=False ) except AuthError as e: logger.warn("Failed to create new leave %r because %s", event, e) @@ -1525,17 +1463,15 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event( - origin, event - ) + context = yield self._handle_new_event(origin, event) event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.info("Sending of leave %s forbidden by third-party rules", event) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) logger.debug( @@ -1552,18 +1488,14 @@ class FederationHandler(BaseHandler): """ event = yield self.store.get_event( - event_id, allow_none=False, check_room_id=room_id, + event_id, allow_none=False, check_room_id=room_id ) - state_groups = yield self.store.get_state_groups( - room_id, [event_id] - ) + state_groups = yield self.store.get_state_groups(room_id, [event_id]) if state_groups: _, state = list(iteritems(state_groups)).pop() - results = { - (e.type, e.state_key): e for e in state - } + results = {(e.type, e.state_key): e for e in state} if event.is_state(): # Get previous state @@ -1585,12 +1517,10 @@ class FederationHandler(BaseHandler): """Returns the state at the event. i.e. not including said event. """ event = yield self.store.get_event( - event_id, allow_none=False, check_room_id=room_id, + event_id, allow_none=False, check_room_id=room_id ) - state_groups = yield self.store.get_state_groups_ids( - room_id, [event_id] - ) + state_groups = yield self.store.get_state_groups_ids(room_id, [event_id]) if state_groups: _, state = list(state_groups.items()).pop() @@ -1616,11 +1546,7 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") - events = yield self.store.get_backfill_events( - room_id, - pdu_list, - limit - ) + events = yield self.store.get_backfill_events(room_id, pdu_list, limit) events = yield filter_events_for_server(self.store, origin, events) @@ -1644,22 +1570,15 @@ class FederationHandler(BaseHandler): AuthError if the server is not currently in the room """ event = yield self.store.get_event( - event_id, - allow_none=True, - allow_rejected=True, + event_id, allow_none=True, allow_rejected=True ) if event: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) + in_room = yield self.auth.check_host_in_room(event.room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") - events = yield filter_events_for_server( - self.store, origin, [event], - ) + events = yield filter_events_for_server(self.store, origin, [event]) event = events[0] defer.returnValue(event) else: @@ -1669,13 +1588,11 @@ class FederationHandler(BaseHandler): return self.store.get_min_depth(context) @defer.inlineCallbacks - def _handle_new_event(self, origin, event, state=None, auth_events=None, - backfilled=False): + def _handle_new_event( + self, origin, event, state=None, auth_events=None, backfilled=False + ): context = yield self._prep_event( - origin, event, - state=state, - auth_events=auth_events, - backfilled=backfilled, + origin, event, state=state, auth_events=auth_events, backfilled=backfilled ) # reraise does not allow inlineCallbacks to preserve the stacktrace, so we @@ -1688,15 +1605,13 @@ class FederationHandler(BaseHandler): ) yield self.persist_events_and_notify( - [(event, context)], - backfilled=backfilled, + [(event, context)], backfilled=backfilled ) success = True finally: if not success: logcontext.run_in_background( - self.store.remove_push_actions_from_staging, - event.event_id, + self.store.remove_push_actions_from_staging, event.event_id ) defer.returnValue(context) @@ -1724,12 +1639,15 @@ class FederationHandler(BaseHandler): ) defer.returnValue(res) - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background(prep, ev_info) - for ev_info in event_infos - ], consumeErrors=True, - )) + contexts = yield logcontext.make_deferred_yieldable( + defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) + for ev_info in event_infos + ], + consumeErrors=True, + ) + ) yield self.persist_events_and_notify( [ @@ -1764,8 +1682,7 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id] = ctx event_map = { - e.event_id: e - for e in itertools.chain(auth_events, state, [event]) + e.event_id: e for e in itertools.chain(auth_events, state, [event]) } create_event = None @@ -1780,7 +1697,7 @@ class FederationHandler(BaseHandler): raise SynapseError(400, "No create event in state") room_version = create_event.content.get( - "room_version", RoomVersions.V1.identifier, + "room_version", RoomVersions.V1.identifier ) missing_auth_events = set() @@ -1791,11 +1708,7 @@ class FederationHandler(BaseHandler): for e_id in missing_auth_events: m_ev = yield self.federation_client.get_pdu( - [origin], - e_id, - room_version=room_version, - outlier=True, - timeout=10000, + [origin], e_id, room_version=room_version, outlier=True, timeout=10000 ) if m_ev and m_ev.event_id == e_id: event_map[e_id] = m_ev @@ -1820,10 +1733,7 @@ class FederationHandler(BaseHandler): # cause SynapseErrors in auth.check. We don't want to give up # the attempt to federate altogether in such cases. - logger.warn( - "Rejecting %s because %s", - e.event_id, err.msg - ) + logger.warn("Rejecting %s because %s", e.event_id, err.msg) if e == event: raise @@ -1833,16 +1743,14 @@ class FederationHandler(BaseHandler): [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) - ], + ] ) new_event_context = yield self.state_handler.compute_event_context( event, old_state=state ) - yield self.persist_events_and_notify( - [(event, new_event_context)], - ) + yield self.persist_events_and_notify([(event, new_event_context)]) @defer.inlineCallbacks def _prep_event(self, origin, event, state, auth_events, backfilled): @@ -1858,40 +1766,30 @@ class FederationHandler(BaseHandler): Returns: Deferred, which resolves to synapse.events.snapshot.EventContext """ - context = yield self.state_handler.compute_event_context( - event, old_state=state, - ) + context = yield self.state_handler.compute_event_context(event, old_state=state) if not auth_events: prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True ) auth_events = yield self.store.get_events(auth_events_ids) - auth_events = { - (e.type, e.state_key): e for e in auth_events.values() - } + auth_events = {(e.type, e.state_key): e for e in auth_events.values()} # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_event_ids(): if len(event.prev_event_ids()) == 1 and event.depth < 5: c = yield self.store.get_event( - event.prev_event_ids()[0], - allow_none=True, + event.prev_event_ids()[0], allow_none=True ) if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c try: - yield self.do_auth( - origin, event, context, auth_events=auth_events - ) + yield self.do_auth(origin, event, context, auth_events=auth_events) except AuthError as e: - logger.warn( - "[%s %s] Rejecting: %s", - event.room_id, event.event_id, e.msg - ) + logger.warn("[%s %s] Rejecting: %s", event.room_id, event.event_id, e.msg) context.rejected = RejectedReason.AUTH_ERROR @@ -1922,9 +1820,7 @@ class FederationHandler(BaseHandler): # "soft-fail" the event. do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() if do_soft_fail_check: - extrem_ids = yield self.store.get_latest_event_ids_in_room( - event.room_id, - ) + extrem_ids = yield self.store.get_latest_event_ids_in_room(event.room_id) extrem_ids = set(extrem_ids) prev_event_ids = set(event.prev_event_ids()) @@ -1952,31 +1848,31 @@ class FederationHandler(BaseHandler): # like bans, especially with state res v2. state_sets = yield self.store.get_state_groups( - event.room_id, extrem_ids, + event.room_id, extrem_ids ) state_sets = list(state_sets.values()) state_sets.append(state) current_state_ids = yield self.state_handler.resolve_events( - room_version, state_sets, event, + room_version, state_sets, event ) current_state_ids = { k: e.event_id for k, e in iteritems(current_state_ids) } else: current_state_ids = yield self.state_handler.get_current_state_ids( - event.room_id, latest_event_ids=extrem_ids, + event.room_id, latest_event_ids=extrem_ids ) logger.debug( "Doing soft-fail check for %s: state %s", - event.event_id, current_state_ids, + event.event_id, + current_state_ids, ) # Now check if event pass auth against said current state auth_types = auth_types_for_event(event) current_state_ids = [ - e for k, e in iteritems(current_state_ids) - if k in auth_types + e for k, e in iteritems(current_state_ids) if k in auth_types ] current_auth_events = yield self.store.get_events(current_state_ids) @@ -1987,19 +1883,14 @@ class FederationHandler(BaseHandler): try: self.auth.check(room_version, event, auth_events=current_auth_events) except AuthError as e: - logger.warn( - "Soft-failing %r because %s", - event, e, - ) + logger.warn("Soft-failing %r because %s", event, e) event.internal_metadata.soft_failed = True @defer.inlineCallbacks - def on_query_auth(self, origin, event_id, room_id, remote_auth_chain, rejects, - missing): - in_room = yield self.auth.check_host_in_room( - room_id, - origin - ) + def on_query_auth( + self, origin, event_id, room_id, remote_auth_chain, rejects, missing + ): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") @@ -2017,28 +1908,23 @@ class FederationHandler(BaseHandler): # Now get the current auth_chain for the event. local_auth_chain = yield self.store.get_auth_chain( - [auth_id for auth_id in event.auth_event_ids()], - include_given=True + [auth_id for auth_id in event.auth_event_ids()], include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. - ret = yield self.construct_auth_difference( - local_auth_chain, remote_auth_chain - ) + ret = yield self.construct_auth_difference(local_auth_chain, remote_auth_chain) logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @defer.inlineCallbacks - def on_get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit): - in_room = yield self.auth.check_host_in_room( - room_id, - origin - ) + def on_get_missing_events( + self, origin, room_id, earliest_events, latest_events, limit + ): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") @@ -2052,7 +1938,7 @@ class FederationHandler(BaseHandler): ) missing_events = yield filter_events_for_server( - self.store, origin, missing_events, + self.store, origin, missing_events ) defer.returnValue(missing_events) @@ -2140,25 +2026,17 @@ class FederationHandler(BaseHandler): if missing_auth: # TODO: can we use store.have_seen_events here instead? - have_events = yield self.store.get_seen_events_with_rejections( - missing_auth - ) + have_events = yield self.store.get_seen_events_with_rejections(missing_auth) logger.debug("Got events %s from store", have_events) missing_auth.difference_update(have_events.keys()) else: have_events = {} - have_events.update({ - e.event_id: "" - for e in auth_events.values() - }) + have_events.update({e.event_id: "" for e in auth_events.values()}) if missing_auth: # If we don't have all the auth events, we need to get them. - logger.info( - "auth_events contains unknown events: %s", - missing_auth, - ) + logger.info("auth_events contains unknown events: %s", missing_auth) try: try: remote_auth_chain = yield self.federation_client.get_event_auth( @@ -2184,18 +2062,16 @@ class FederationHandler(BaseHandler): try: auth_ids = e.auth_event_ids() auth = { - (e.type, e.state_key): e for e in remote_auth_chain + (e.type, e.state_key): e + for e in remote_auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create } e.internal_metadata.outlier = True logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth + "do_auth %s missing_auth: %s", event.event_id, e.event_id ) + yield self._handle_new_event(origin, e, auth_events=auth) if e.event_id in event_auth_events: auth_events[(e.type, e.state_key)] = e @@ -2231,35 +2107,36 @@ class FederationHandler(BaseHandler): room_version = yield self.store.get_room_version(event.room_id) different_events = yield logcontext.make_deferred_yieldable( - defer.gatherResults([ - logcontext.run_in_background( - self.store.get_event, - d, - allow_none=True, - allow_rejected=False, - ) - for d in different_auth - if d in have_events and not have_events[d] - ], consumeErrors=True) + defer.gatherResults( + [ + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False + ) + for d in different_auth + if d in have_events and not have_events[d] + ], + consumeErrors=True, + ) ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) remote_view = dict(auth_events) - remote_view.update({ - (d.type, d.state_key): d for d in different_events if d - }) + remote_view.update( + {(d.type, d.state_key): d for d in different_events if d} + ) new_state = yield self.state_handler.resolve_events( room_version, [list(local_view.values()), list(remote_view.values())], - event + event, ) logger.info( "After state res: updating auth_events with new state %s", { - (d.type, d.state_key): d.event_id for d in new_state.values() + (d.type, d.state_key): d.event_id + for d in new_state.values() if auth_events.get((d.type, d.state_key)) != d }, ) @@ -2271,7 +2148,7 @@ class FederationHandler(BaseHandler): ) yield self._update_context_for_auth_events( - event, context, auth_events, event_key, + event, context, auth_events, event_key ) if not different_auth: @@ -2305,21 +2182,14 @@ class FederationHandler(BaseHandler): prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. - auth_ids = yield self.auth.compute_auth_events( - event, prev_state_ids - ) - local_auth_chain = yield self.store.get_auth_chain( - auth_ids, include_given=True - ) + auth_ids = yield self.auth.compute_auth_events(event, prev_state_ids) + local_auth_chain = yield self.store.get_auth_chain(auth_ids, include_given=True) try: # 2. Get remote difference. try: result = yield self.federation_client.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, + origin, event.room_id, event.event_id, local_auth_chain ) except RequestSendFailed as e: # The other side isn't around or doesn't implement the @@ -2344,19 +2214,15 @@ class FederationHandler(BaseHandler): auth = { (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - or event.type == EventTypes.Create + if e.event_id in auth_ids or event.type == EventTypes.Create } ev.internal_metadata.outlier = True logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id + "do_auth %s different_auth: %s", event.event_id, e.event_id ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + yield self._handle_new_event(origin, ev, auth_events=auth) if ev.event_id in event_auth_events: auth_events[(ev.type, ev.state_key)] = ev @@ -2371,12 +2237,11 @@ class FederationHandler(BaseHandler): # TODO. yield self._update_context_for_auth_events( - event, context, auth_events, event_key, + event, context, auth_events, event_key ) @defer.inlineCallbacks - def _update_context_for_auth_events(self, event, context, auth_events, - event_key): + def _update_context_for_auth_events(self, event, context, auth_events, event_key): """Update the state_ids in an event context after auth event resolution, storing the changes as a new state group. @@ -2393,8 +2258,7 @@ class FederationHandler(BaseHandler): this will not be included in the current_state in the context. """ state_updates = { - k: a.event_id for k, a in iteritems(auth_events) - if k != event_key + k: a.event_id for k, a in iteritems(auth_events) if k != event_key } current_state_ids = yield context.get_current_state_ids(self.store) current_state_ids = dict(current_state_ids) @@ -2404,9 +2268,7 @@ class FederationHandler(BaseHandler): prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = dict(prev_state_ids) - prev_state_ids.update({ - k: a.event_id for k, a in iteritems(auth_events) - }) + prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)}) # create a new state group as a delta from the existing one. prev_group = context.state_group @@ -2555,30 +2417,23 @@ class FederationHandler(BaseHandler): logger.debug("construct_auth_difference returning") - defer.returnValue({ - "auth_chain": local_auth, - "rejects": { - e.event_id: { - "reason": reason_map[e.event_id], - "proof": None, - } - for e in base_remote_rejected - }, - "missing": [e.event_id for e in missing_locals], - }) + defer.returnValue( + { + "auth_chain": local_auth, + "rejects": { + e.event_id: {"reason": reason_map[e.event_id], "proof": None} + for e in base_remote_rejected + }, + "missing": [e.event_id for e in missing_locals], + } + ) @defer.inlineCallbacks @log_function def exchange_third_party_invite( - self, - sender_user_id, - target_user_id, - room_id, - signed, + self, sender_user_id, target_user_id, room_id, signed ): - third_party_invite = { - "signed": signed, - } + third_party_invite = {"signed": signed} event_dict = { "type": EventTypes.Member, @@ -2601,7 +2456,7 @@ class FederationHandler(BaseHandler): ) event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.info( @@ -2609,7 +2464,7 @@ class FederationHandler(BaseHandler): event, ) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) event, context = yield self.add_display_name_to_third_party_invite( @@ -2634,9 +2489,7 @@ class FederationHandler(BaseHandler): else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) yield self.federation_client.forward_third_party_invite( - destinations, - room_id, - event_dict, + destinations, room_id, event_dict ) @defer.inlineCallbacks @@ -2657,19 +2510,18 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(room_version, event_dict) event, context = yield self.event_creation_handler.create_new_client_event( - builder=builder, + builder=builder ) event_allowed = yield self.third_party_event_rules.check_event_allowed( - event, context, + event, context ) if not event_allowed: logger.warning( - "Exchange of threepid invite %s forbidden by third-party rules", - event, + "Exchange of threepid invite %s forbidden by third-party rules", event ) raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN, + 403, "This event is not allowed in this context", Codes.FORBIDDEN ) event, context = yield self.add_display_name_to_third_party_invite( @@ -2691,11 +2543,12 @@ class FederationHandler(BaseHandler): yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks - def add_display_name_to_third_party_invite(self, room_version, event_dict, - event, context): + def add_display_name_to_third_party_invite( + self, room_version, event_dict, event, context + ): key = ( EventTypes.ThirdPartyInvite, - event.content["third_party_invite"]["signed"]["token"] + event.content["third_party_invite"]["signed"]["token"], ) original_invite = None prev_state_ids = yield context.get_prev_state_ids(self.store) @@ -2709,8 +2562,7 @@ class FederationHandler(BaseHandler): event_dict["content"]["third_party_invite"]["display_name"] = display_name else: logger.info( - "Could not find invite event for third_party_invite: %r", - event_dict + "Could not find invite event for third_party_invite: %r", event_dict ) # We don't discard here as this is not the appropriate place to do # auth checks. If we need the invite and don't have it then the @@ -2719,7 +2571,7 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(room_version, event_dict) EventValidator().validate_builder(builder) event, context = yield self.event_creation_handler.create_new_client_event( - builder=builder, + builder=builder ) EventValidator().validate_new(event) defer.returnValue((event, context)) @@ -2743,9 +2595,7 @@ class FederationHandler(BaseHandler): token = signed["token"] prev_state_ids = yield context.get_prev_state_ids(self.store) - invite_event_id = prev_state_ids.get( - (EventTypes.ThirdPartyInvite, token,) - ) + invite_event_id = prev_state_ids.get((EventTypes.ThirdPartyInvite, token)) invite_event = None if invite_event_id: @@ -2769,38 +2619,42 @@ class FederationHandler(BaseHandler): logger.debug( "Attempting to verify sig with key %s from %r " "against pubkey %r", - key_name, server, public_key_object, + key_name, + server, + public_key_object, ) try: public_key = public_key_object["public_key"] verify_key = decode_verify_key_bytes( - key_name, - decode_base64(public_key) + key_name, decode_base64(public_key) ) verify_signed_json(signed, server, verify_key) logger.debug( "Successfully verified sig with key %s from %r " "against pubkey %r", - key_name, server, public_key_object, + key_name, + server, + public_key_object, ) except Exception: logger.info( "Failed to verify sig with key %s from %r " "against pubkey %r", - key_name, server, public_key_object, + key_name, + server, + public_key_object, ) raise try: if "key_validity_url" in public_key_object: yield self._check_key_revocation( - public_key, - public_key_object["key_validity_url"] + public_key, public_key_object["key_validity_url"] ) except Exception: logger.info( "Failed to query key_validity_url %s", - public_key_object["key_validity_url"] + public_key_object["key_validity_url"], ) raise return @@ -2823,15 +2677,9 @@ class FederationHandler(BaseHandler): for revocation. """ try: - response = yield self.http_client.get_json( - url, - {"public_key": public_key} - ) + response = yield self.http_client.get_json(url, {"public_key": public_key}) except Exception: - raise SynapseError( - 502, - "Third party certificate could not be checked" - ) + raise SynapseError(502, "Third party certificate could not be checked") if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") @@ -2852,12 +2700,11 @@ class FederationHandler(BaseHandler): yield self._send_events_to_master( store=self.store, event_and_contexts=event_and_contexts, - backfilled=backfilled + backfilled=backfilled, ) else: max_stream_id = yield self.store.persist_events( - event_and_contexts, - backfilled=backfilled, + event_and_contexts, backfilled=backfilled ) if not backfilled: # Never notify for backfilled events @@ -2891,13 +2738,10 @@ class FederationHandler(BaseHandler): event_stream_id = event.internal_metadata.stream_ordering self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users + event, event_stream_id, max_stream_id, extra_users=extra_users ) - return self.pusher_pool.on_new_notifications( - event_stream_id, max_stream_id, - ) + return self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) def _clean_room_for_join(self, room_id): """Called to clean up any data in DB for a given room, ready for the @@ -2916,9 +2760,7 @@ class FederationHandler(BaseHandler): """ if self.config.worker_app: return self._notify_user_membership_change( - room_id=room_id, - user_id=user.to_string(), - change="joined", + room_id=room_id, user_id=user.to_string(), change="joined" ) else: return user_joined_room(self.distributor, user, room_id) |