diff options
author | Eric Eastwood <erice@element.io> | 2022-09-24 04:16:56 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2022-09-24 04:16:56 -0500 |
commit | 78b44340d61ebcd6d9a53f4ccadc8d634100ab55 (patch) | |
tree | 4cf9d3d1cc2beb41d278570501d77f8c6db2ad59 | |
parent | Debugging (diff) | |
download | synapse-78b44340d61ebcd6d9a53f4ccadc8d634100ab55.tar.xz |
More debugging
-rw-r--r-- | synapse/handlers/federation_event.py | 46 | ||||
-rw-r--r-- | synapse/state/__init__.py | 104 |
2 files changed, 142 insertions, 8 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 03c3ab81b9..ae438e094d 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -178,6 +178,7 @@ class FederationEventHandler: self._room_pdu_linearizer = Linearizer("fed_room_pdu") + @trace async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: """Process a PDU received via a federation /send/ transaction @@ -663,11 +664,12 @@ class FederationEventHandler: logger.info( "backfill assumed reverse_chronological_events=%s", [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" + "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n" % ( event.event_id, event.depth, event.content.get("body", event.type), + getattr(event, "state_key", None), event.prev_event_ids(), ) for event in reverse_chronological_events @@ -677,11 +679,12 @@ class FederationEventHandler: # logger.info( # "backfill chronological_events=%s", # [ - # "event_id=%s,depth=%d,body=%s,prevs=%s\n" + # "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n" # % ( # event.event_id, # event.depth, # event.content.get("body", event.type), + # getattr(event, "state_key", None), # event.prev_event_ids(), # ) # for event in chronological_events @@ -712,8 +715,8 @@ class FederationEventHandler: # Expecting to persist in chronological order here (oldest -> # newest) so that events are persisted before they're referenced # as a `prev_event`. - # chronological_events, - reverse_chronological_events, + chronological_events, + # reverse_chronological_events, backfilled=True, ) @@ -848,11 +851,12 @@ class FederationEventHandler: "processing pulled backfilled=%s events=%s", backfilled, [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" + "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n" % ( event.event_id, event.depth, event.content.get("body", event.type), + getattr(event, "state_key", None), event.prev_event_ids(), ) for event in events @@ -866,11 +870,12 @@ class FederationEventHandler: logger.info( "backfill sorted_events=%s", [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" + "event_id=%s,depth=%d,body=%s(%s),prevs=%s\n" % ( event.event_id, event.depth, event.content.get("body", event.type), + getattr(event, "state_key", None), event.prev_event_ids(), ) for event in sorted_events @@ -1872,6 +1877,12 @@ class FederationEventHandler: # already have checked we have all the auth events, in # _load_or_fetch_auth_events_for_event above) if context.partial_state: + logger.info( + "_check_event_auth(event=%s) with partial_state - %s (%s)", + event.event_id, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) room_version = await self._store.get_room_version_id(event.room_id) local_state_id_map = await context.get_prev_state_ids() @@ -1889,15 +1900,38 @@ class FederationEventHandler: ) ) else: + logger.info( + "_check_event_auth(event=%s) with full state - %s (%s)", + event.event_id, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) event_types = event_auth.auth_types_for_event(event.room_version, event) state_for_auth_id_map = await context.get_prev_state_ids( StateFilter.from_types(event_types) ) + logger.info( + "_check_event_auth(event=%s) state_for_auth_id_map=%s - %s (%s)", + event.event_id, + state_for_auth_id_map, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) + calculated_auth_event_ids = self._event_auth_handler.compute_auth_events( event, state_for_auth_id_map, for_verification=True ) + logger.info( + "_check_event_auth(event=%s) claimed_auth_events=%s calculated_auth_event_ids=%s - %s (%s)", + event.event_id, + event.auth_event_ids(), + calculated_auth_event_ids, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) + # if those are the same, we're done here. if collections.Counter(event.auth_event_ids()) == collections.Counter( calculated_auth_event_ids diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 3787d35b24..3c535b9a6c 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -36,11 +36,13 @@ import attr from frozendict import frozendict from prometheus_client import Counter, Histogram +from synapse import event_auth from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.logging.context import ContextResourceUsage +from synapse.logging.tracing import SynapseTags, log_kv, trace, tag_args, set_attribute from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour @@ -250,6 +252,8 @@ class StateHandler: state = await entry.get_state(self._state_storage_controller, StateFilter.all()) return await self.store.get_joined_hosts(room_id, state, entry) + @trace + @tag_args async def compute_event_context( self, event: EventBase, @@ -282,6 +286,14 @@ class StateHandler: RuntimeError if `state_ids_before_event` is not provided and one or more prev events are missing or outliers. """ + set_attribute( + SynapseTags.RESULT_PREFIX + "event_type_and_state", + f"{event.type} - {getattr(event, 'state_key', None)}", + ) + set_attribute( + SynapseTags.RESULT_PREFIX + "event_body", + event.content.get("body", None), + ) assert not event.internal_metadata.is_outlier() @@ -289,6 +301,14 @@ class StateHandler: # first of all, figure out the state before the event, unless we # already have it. # + logger.info( + "compute_event_context(event=%s, state_ids_before_event=%s) - %s (%s)", + event.event_id, + state_ids_before_event, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) + if state_ids_before_event: # if we're given the state before the event, then we use that state_group_before_event_prev_group = None @@ -304,6 +324,12 @@ class StateHandler: current_state_ids=state_ids_before_event, ) ) + log_kv( + { + "message": "Using state before event because `state_ids_before_event` was given:", + "state_group_before_event": state_group_before_event, + } + ) # the partial_state flag must be provided assert partial_state is not None @@ -324,7 +350,7 @@ class StateHandler: ) partial_state = any(incomplete_prev_events.values()) if partial_state: - logger.debug( + logger.info( "New/incoming event %s refers to prev_events %s with partial state", event.event_id, [k for (k, v) in incomplete_prev_events.items() if v], @@ -343,6 +369,24 @@ class StateHandler: deltas_to_state_group_before_event = entry.delta_ids state_ids_before_event = None + logger.info( + "compute_event_context(event=%s) entry.state_group=%s state_group_before_event_prev_group=%s deltas_to_state_group_before_event=%s - %s (%s)", + event.event_id, + entry.state_group, + state_group_before_event_prev_group, + deltas_to_state_group_before_event, + event.content.get("body", event.type), + getattr(event, "state_key", None), + ) + log_kv( + { + "message": "resolve_state_groups_for_events", + "entry.state_group": entry.state_group, + "state_group_before_event_prev_group": state_group_before_event_prev_group, + "deltas_to_state_group_before_event": deltas_to_state_group_before_event, + } + ) + # We make sure that we have a state group assigned to the state. if entry.state_group is None: # store_state_group requires us to have either a previous state group @@ -352,6 +396,12 @@ class StateHandler: state_ids_before_event = await entry.get_state( self._state_storage_controller, StateFilter.all() ) + log_kv( + { + "message": "state_group_before_event_prev_group was None so get state_ids_before_event", + "state_ids_before_event": state_ids_before_event, + } + ) state_group_before_event = ( await self._state_storage_controller.store_state_group( @@ -363,15 +413,27 @@ class StateHandler: ) ) entry.set_state_group(state_group_before_event) + log_kv( + { + "message": "entry.set_state_group(state_group_before_event)", + "state_group_before_event": state_group_before_event, + } + ) else: state_group_before_event = entry.state_group + log_kv( + { + "message": "Entry already has a state_group", + "state_group_before_event": state_group_before_event, + } + ) # # now if it's not a state event, we're done # if not event.is_state(): - return EventContext.with_state( + event_context = EventContext.with_state( storage=self._storage_controllers, state_group_before_event=state_group_before_event, state_group=state_group_before_event, @@ -381,6 +443,22 @@ class StateHandler: partial_state=partial_state, ) + state_for_auth_id_map = await event_context.get_prev_state_ids( + StateFilter.from_types( + event_auth.auth_types_for_event(event.room_version, event) + ) + ) + log_kv( + { + "message": "Done creating context for non-state event", + "state_for_auth_id_map from event_context": str( + state_for_auth_id_map + ), + } + ) + + return event_context + # # otherwise, we'll need to create a new state group for after the event # @@ -421,6 +499,7 @@ class StateHandler: ) @measure_func() + @trace async def resolve_state_groups_for_events( self, room_id: str, event_ids: Collection[str], await_full_state: bool = True ) -> _StateCacheEntry: @@ -448,6 +527,8 @@ class StateHandler: state_group_ids = state_groups.values() + log_kv({"state_group_ids": state_group_ids, "state_groups": state_groups}) + # check if each event has same state group id, if so there's no state to resolve state_group_ids_set = set(state_group_ids) if len(state_group_ids_set) == 1: @@ -458,6 +539,13 @@ class StateHandler: ) = await self._state_storage_controller.get_state_group_delta( state_group_id ) + log_kv( + { + "message": "Returning state_group_id", + "state_group_id": state_group_id, + "prev_group": prev_group, + } + ) return _StateCacheEntry( state=None, state_group=state_group_id, @@ -465,6 +553,11 @@ class StateHandler: delta_ids=delta_ids, ) elif len(state_group_ids_set) == 0: + log_kv( + { + "message": "Returning empty state group since there are no state_group_ids", + } + ) return _StateCacheEntry(state={}, state_group=None) room_version = await self.store.get_room_version_id(room_id) @@ -480,6 +573,13 @@ class StateHandler: None, state_res_store=StateResolutionStore(self.store), ) + log_kv( + { + "message": "Resolving state groups and returning result", + "state_to_resolve": state_to_resolve, + "result": result, + } + ) return result async def update_current_state(self, room_id: str) -> None: |