diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/e2e_keys.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 142 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 2 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 22 |
4 files changed, 116 insertions, 52 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 578e9250fb..9dc46aa15f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -341,7 +341,7 @@ class E2eKeysHandler(object): def _exception_to_failure(e): if isinstance(e, CodeMessageException): return { - "status": e.code, "message": e.message, + "status": e.code, "message": str(e), } if isinstance(e, NotRetryingDestination): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8d6bd7976d..38bebbf598 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -136,7 +136,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_receive_pdu( - self, origin, pdu, get_missing=True, sent_to_us_directly=False, + self, origin, pdu, sent_to_us_directly=False, ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -145,7 +145,8 @@ class FederationHandler(BaseHandler): origin (str): server which initiated the /send/ transaction. Will be used to fetch missing events or state. pdu (FrozenEvent): received PDU - get_missing (bool): True if we should fetch missing prev_events + sent_to_us_directly (bool): True if this event was pushed to us; False if + we pulled it as the result of a missing prev_event. Returns (Deferred): completes with None """ @@ -250,7 +251,7 @@ class FederationHandler(BaseHandler): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: missing_prevs = prevs - seen - if get_missing and missing_prevs: + if sent_to_us_directly and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( @@ -282,24 +283,46 @@ class FederationHandler(BaseHandler): room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) - if sent_to_us_directly and prevs - seen: - # If they have sent it to us directly, and the server - # isn't telling us about the auth events that it's - # made a message referencing, we explode - logger.warn( - "[%s %s] Failed to fetch %d prev events: rejecting", - room_id, event_id, len(prevs - seen), - ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) - elif prevs - seen: + if prevs - seen: + # We've still not been able to get all of the prev_events for this event. + # + # In this case, we need to fall back to asking another server in the + # federation for the state at this event. That's ok provided we then + # resolve the state against other bits of the DAG before using it (which + # will ensure that you can't just take over a room by sending an event, + # withholding its prev_events, and declaring yourself to be an admin in + # the subsequent state request). + # + # Now, if we're pulling this event as a missing prev_event, then clearly + # this event is not going to become the only forward-extremity and we are + # guaranteed to resolve its state against our existing forward + # extremities, so that should be fine. + # + # On the other hand, if this event was pushed to us, it is possible for + # it to become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very bad. + # Further, if the event was pushed to us, there is no excuse for us not to + # have all the prev_events. We therefore reject any such events. + # + # XXX this really feels like it could/should be merged with the above, + # but there is an interaction with min_depth that I'm not really + # following. + + if sent_to_us_directly: + logger.warn( + "[%s %s] Failed to fetch %d prev events: rejecting", + room_id, event_id, len(prevs - seen), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) + # Calculate the state of the previous events, and # de-conflict them to find the current state. state_groups = [] @@ -316,14 +339,26 @@ class FederationHandler(BaseHandler): "[%s %s] Requesting state at missing prev_event %s", room_id, event_id, p, ) - state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p, + + with logcontext.nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, room_id, p, + ) ) - ) - auth_chains.update(got_auth_chain) - state_group = {(x.type, x.state_key): x.event_id for x in state} - state_groups.append(state_group) + + # XXX hrm I'm not convinced that duplicate events will compare + # for equality, so I'm not sure this does what the author + # hoped. + auth_chains.update(got_auth_chain) + + state_group = { + (x.type, x.state_key): x.event_id for x in remote_state + } + state_groups.append(state_group) # Resolve any conflicting state def fetch(ev_ids): @@ -460,20 +495,21 @@ class FederationHandler(BaseHandler): "[%s %s] Handling received prev_event %s", room_id, event_id, ev.event_id, ) - try: - yield self.on_receive_pdu( - origin, - ev, - get_missing=False - ) - except FederationError as e: - if e.code == 403: - logger.warn( - "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + with logcontext.nested_logging_context(ev.event_id): + try: + yield self.on_receive_pdu( + origin, + ev, + sent_to_us_directly=False, ) - else: - raise + except FederationError as e: + if e.code == 403: + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) + else: + raise @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): @@ -549,6 +585,10 @@ class FederationHandler(BaseHandler): }) seen_ids.add(e.event_id) + logger.info( + "[%s %s] persisting newly-received auth/state events %s", + room_id, event_id, [e["event"].event_id for e in event_infos] + ) yield self._handle_new_events(origin, event_infos) try: @@ -1112,7 +1152,8 @@ class FederationHandler(BaseHandler): try: logger.info("Processing queued PDU %s which was received " "while we were joining %s", p.event_id, p.room_id) - yield self.on_receive_pdu(origin, p) + with logcontext.nested_logging_context(p.event_id): + yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( "Error handling queued PDU %s from %s: %s", @@ -1558,15 +1599,22 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self._prep_event, + + @defer.inlineCallbacks + def prep(ev_info): + event = ev_info["event"] + with logcontext.nested_logging_context(suffix=event.event_id): + res = yield self._prep_event( origin, - ev_info["event"], + event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) + defer.returnValue(res) + + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) for ev_info in event_infos ], consumeErrors=True, )) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 75b8b7ce6a..f284d5a385 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler): except Exception as e: logger.warn( "Failed to update join event for room %s - %s", - room_id, str(e.message) + room_id, str(e) ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 2d2d3d5a0d..65f475d639 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -20,6 +20,7 @@ from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError from synapse.types import UserID, get_domain_from_id +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -68,6 +69,11 @@ class TypingHandler(object): # map room IDs to sets of users currently typing self._room_typing = {} + # caches which room_ids changed at which serials + self._typing_stream_change_cache = StreamChangeCache( + "TypingStreamChangeCache", self._latest_room_serial, + ) + self.clock.looping_call( self._handle_timeouts, 5000, @@ -274,19 +280,29 @@ class TypingHandler(object): self._latest_room_serial += 1 self._room_serials[member.room_id] = self._latest_room_serial + self._typing_stream_change_cache.entity_has_changed( + member.room_id, self._latest_room_serial, + ) self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[member.room_id] ) def get_all_typing_updates(self, last_id, current_id): - # TODO: Work out a way to do this without scanning the entire state. if last_id == current_id: return [] + changed_rooms = self._typing_stream_change_cache.get_all_entities_changed( + last_id, + ) + + if changed_rooms is None: + changed_rooms = self._room_serials + rows = [] - for room_id, serial in self._room_serials.items(): - if last_id < serial and serial <= current_id: + for room_id in changed_rooms: + serial = self._room_serials[room_id] + if last_id < serial <= current_id: typing = self._room_typing[room_id] rows.append((serial, room_id, list(typing))) rows.sort() |