diff options
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r-- | synapse/handlers/federation_event.py | 180 |
1 files changed, 100 insertions, 80 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9f055f00cf..69f8287b2b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -36,6 +36,7 @@ from synapse import event_auth from synapse.api.constants import ( EventContentFields, EventTypes, + GuestAccess, Membership, RejectedReason, RoomEncryptionAlgorithms, @@ -53,7 +54,6 @@ from synapse.event_auth import auth_types_for_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError -from synapse.handlers._base import BaseHandler from synapse.logging.context import ( make_deferred_yieldable, nested_logging_context, @@ -116,7 +116,7 @@ class _NewEventInfo: claimed_auth_event_map: StateMap[EventBase] -class FederationEventHandler(BaseHandler): +class FederationEventHandler: """Handles events that originated from federation. Responsible for handing incoming events and passing them on to the rest @@ -124,26 +124,28 @@ class FederationEventHandler(BaseHandler): """ def __init__(self, hs: "HomeServer"): - super().__init__(hs) + self._store = hs.get_datastore() + self._storage = hs.get_storage() + self._state_store = self._storage.state - self.store = hs.get_datastore() - self.storage = hs.get_storage() - self.state_store = self.storage.state - - self.state_handler = hs.get_state_handler() - self.event_creation_handler = hs.get_event_creation_handler() + self._state_handler = hs.get_state_handler() + self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self.action_generator = hs.get_action_generator() + self._action_generator = hs.get_action_generator() self._state_resolution_handler = hs.get_state_resolution_handler() + # avoid a circular dependency by deferring execution here + self._get_room_member_handler = hs.get_room_member_handler - self.federation_client = hs.get_federation_client() - self.third_party_event_rules = hs.get_third_party_event_rules() + self._federation_client = hs.get_federation_client() + self._third_party_event_rules = hs.get_third_party_event_rules() + self._notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id + self._is_mine_id = hs.is_mine_id + self._server_name = hs.hostname self._instance_name = hs.get_instance_name() - self.config = hs.config + self._config = hs.config self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) @@ -175,7 +177,7 @@ class FederationEventHandler(BaseHandler): event_id = pdu.event_id # We reprocess pdus when we have seen them only as outliers - existing = await self.store.get_event( + existing = await self._store.get_event( event_id, allow_none=True, allow_rejected=True ) @@ -221,7 +223,7 @@ class FederationEventHandler(BaseHandler): # Note that if we were never in the room then we would have already # dropped the event, since we wouldn't know the room version. is_in_room = await self._event_auth_handler.check_host_in_room( - room_id, self.server_name + room_id, self._server_name ) if not is_in_room: logger.info( @@ -238,7 +240,7 @@ class FederationEventHandler(BaseHandler): # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): prevs = set(pdu.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen if missing_prevs: @@ -272,7 +274,7 @@ class FederationEventHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen if not missing_prevs: @@ -361,7 +363,7 @@ class FederationEventHandler(BaseHandler): # the room, so we send it on their behalf. event.internal_metadata.send_on_behalf_of = origin - context = await self.state_handler.compute_event_context(event) + context = await self._state_handler.compute_event_context(event) context = await self._check_event_auth(origin, event, context) if context.rejected: raise SynapseError( @@ -375,7 +377,7 @@ class FederationEventHandler(BaseHandler): # for knock events, we run the third-party event rules. It's not entirely clear # why we don't do this for other sorts of membership events. if event.membership == Membership.KNOCK: - event_allowed, _ = await self.third_party_event_rules.check_event_allowed( + event_allowed, _ = await self._third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -404,7 +406,7 @@ class FederationEventHandler(BaseHandler): prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) prev_member_event = None if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) + prev_member_event = await self._store.get_event(prev_member_event_id) # Check if the member should be allowed access via membership in a space. await self._event_auth_handler.check_restricted_join_rules( @@ -434,10 +436,10 @@ class FederationEventHandler(BaseHandler): server from invalid events (there is probably no point in trying to re-fetch invalid events from every other HS in the room.) """ - if dest == self.server_name: + if dest == self._server_name: raise SynapseError(400, "Can't backfill from self.") - events = await self.federation_client.backfill( + events = await self._federation_client.backfill( dest, room_id, limit=limit, extremities=extremities ) @@ -469,12 +471,12 @@ class FederationEventHandler(BaseHandler): room_id = pdu.room_id event_id = pdu.event_id - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) if not prevs - seen: return - latest_list = await self.store.get_latest_event_ids_in_room(room_id) + latest_list = await self._store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -536,7 +538,7 @@ class FederationEventHandler(BaseHandler): # All that said: Let's try increasing the timeout to 60s and see what happens. try: - missing_events = await self.federation_client.get_missing_events( + missing_events = await self._federation_client.get_missing_events( origin, room_id, earliest_events_ids=list(latest), @@ -609,7 +611,7 @@ class FederationEventHandler(BaseHandler): event_id = event.event_id - existing = await self.store.get_event( + existing = await self._store.get_event( event_id, allow_none=True, allow_rejected=True ) if existing: @@ -674,7 +676,7 @@ class FederationEventHandler(BaseHandler): event_id = event.event_id prevs = set(event.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen if not missing_prevs: @@ -691,7 +693,7 @@ class FederationEventHandler(BaseHandler): event_map = {event_id: event} try: # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids(room_id, seen) + ours = await self._state_store.get_state_groups_ids(room_id, seen) # state_maps is a list of mappings from (type, state_key) to event_id state_maps: List[StateMap[str]] = list(ours.values()) @@ -720,13 +722,13 @@ class FederationEventHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = await self.store.get_room_version_id(room_id) + room_version = await self._store.get_room_version_id(room_id) state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, event_map, - state_res_store=StateResolutionStore(self.store), + state_res_store=StateResolutionStore(self._store), ) # We need to give _process_received_pdu the actual state events @@ -734,7 +736,7 @@ class FederationEventHandler(BaseHandler): # First though we need to fetch all the events that are in # state_map, so we can build up the state below. - evs = await self.store.get_events( + evs = await self._store.get_events( list(state_map.values()), get_prev_content=False, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -774,7 +776,7 @@ class FederationEventHandler(BaseHandler): ( state_event_ids, auth_event_ids, - ) = await self.federation_client.get_room_state_ids( + ) = await self._federation_client.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -788,7 +790,7 @@ class FederationEventHandler(BaseHandler): desired_events = set(state_event_ids) desired_events.add(event_id) logger.debug("Fetching %i events from cache/store", len(desired_events)) - fetched_events = await self.store.get_events( + fetched_events = await self._store.get_events( desired_events, allow_rejected=True ) @@ -809,7 +811,7 @@ class FederationEventHandler(BaseHandler): missing_auth_events = set(auth_event_ids) - fetched_events.keys() missing_auth_events.difference_update( - await self.store.have_seen_events(room_id, missing_auth_events) + await self._store.have_seen_events(room_id, missing_auth_events) ) logger.debug("We are also missing %i auth events", len(missing_auth_events)) @@ -822,7 +824,7 @@ class FederationEventHandler(BaseHandler): # we need to make sure we re-load from the database to get the rejected # state correct. fetched_events.update( - await self.store.get_events(missing_desired_events, allow_rejected=True) + await self._store.get_events(missing_desired_events, allow_rejected=True) ) # check for events which were in the wrong room. @@ -901,7 +903,7 @@ class FederationEventHandler(BaseHandler): logger.debug("Processing event: %s", event) try: - context = await self.state_handler.compute_event_context( + context = await self._state_handler.compute_event_context( event, old_state=state ) await self._auth_and_persist_event( @@ -919,7 +921,7 @@ class FederationEventHandler(BaseHandler): device_id = event.content.get("device_id") sender_key = event.content.get("sender_key") - cached_devices = await self.store.get_cached_devices_for_user(event.sender) + cached_devices = await self._store.get_cached_devices_for_user(event.sender) resync = False # Whether we should resync device lists. @@ -995,10 +997,10 @@ class FederationEventHandler(BaseHandler): """ try: - await self.store.mark_remote_user_device_cache_as_stale(sender) + await self._store.mark_remote_user_device_cache_as_stale(sender) # Immediately attempt a resync in the background - if self.config.worker_app: + if self._config.worker_app: await self._user_device_resync(user_id=sender) else: await self._device_list_updater.user_device_resync(sender) @@ -1023,9 +1025,15 @@ class FederationEventHandler(BaseHandler): return # Skip processing a marker event if the room version doesn't - # support it. - room_version = await self.store.get_room_version(marker_event.room_id) - if not room_version.msc2716_historical: + # support it or the event is not from the room creator. + room_version = await self._store.get_room_version(marker_event.room_id) + create_event = await self._store.get_create_event_for_room(marker_event.room_id) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + if ( + not room_version.msc2716_historical + or not self._config.experimental.msc2716_enabled + or marker_event.sender != room_creator + ): return logger.debug("_handle_marker_event: received %s", marker_event) @@ -1048,7 +1056,7 @@ class FederationEventHandler(BaseHandler): [insertion_event_id], ) - insertion_event = await self.store.get_event( + insertion_event = await self._store.get_event( insertion_event_id, allow_none=True ) if insertion_event is None: @@ -1066,7 +1074,7 @@ class FederationEventHandler(BaseHandler): marker_event, ) - await self.store.insert_insertion_extremity( + await self._store.insert_insertion_extremity( insertion_event_id, marker_event.room_id ) @@ -1088,14 +1096,14 @@ class FederationEventHandler(BaseHandler): Logs a warning if we can't find the given event. """ - room_version = await self.store.get_room_version(room_id) + room_version = await self._store.get_room_version(room_id) event_map: Dict[str, EventBase] = {} async def get_event(event_id: str): with nested_logging_context(event_id): try: - event = await self.federation_client.get_pdu( + event = await self._federation_client.get_pdu( [destination], event_id, room_version, @@ -1131,7 +1139,7 @@ class FederationEventHandler(BaseHandler): for aid in event.auth_event_ids() if aid not in event_map ] - persisted_events = await self.store.get_events( + persisted_events = await self._store.get_events( auth_events, allow_rejected=True, ) @@ -1175,7 +1183,7 @@ class FederationEventHandler(BaseHandler): async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): - res = await self.state_handler.compute_event_context(event) + res = await self._state_handler.compute_event_context(event) res = await self._check_event_auth( origin, event, @@ -1278,7 +1286,7 @@ class FederationEventHandler(BaseHandler): Returns: The updated context object. """ - room_version = await self.store.get_room_version_id(event.room_id) + room_version = await self._store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] if claimed_auth_event_map: @@ -1291,7 +1299,7 @@ class FederationEventHandler(BaseHandler): auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events_x = await self.store.get_events(auth_events_ids) + auth_events_x = await self._store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} try: @@ -1321,19 +1329,29 @@ class FederationEventHandler(BaseHandler): if not context.rejected: await self._check_for_soft_fail(event, state, backfilled, origin=origin) - - if event.type == EventTypes.GuestAccess and not context.rejected: - await self.maybe_kick_guest_users(event) + await self._maybe_kick_guest_users(event) # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event( + await self._event_creation_handler.cache_joined_hosts_for_event( event, context ) return context + async def _maybe_kick_guest_users(self, event: EventBase) -> None: + if event.type != EventTypes.GuestAccess: + return + + guest_access = event.content.get(EventContentFields.GUEST_ACCESS) + if guest_access == GuestAccess.CAN_JOIN: + return + + current_state_map = await self._state_handler.get_current_state(event.room_id) + current_state = list(current_state_map.values()) + await self._get_room_member_handler().kick_guest_users(current_state) + async def _check_for_soft_fail( self, event: EventBase, @@ -1356,7 +1374,7 @@ class FederationEventHandler(BaseHandler): if backfilled or event.internal_metadata.is_outlier(): return - extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id) extrem_ids = set(extrem_ids_list) prev_event_ids = set(event.prev_event_ids()) @@ -1365,7 +1383,7 @@ class FederationEventHandler(BaseHandler): # state at the event, so no point rechecking auth for soft fail. return - room_version = await self.store.get_room_version_id(event.room_id) + room_version = await self._store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] # Calculate the "current state". @@ -1382,19 +1400,19 @@ class FederationEventHandler(BaseHandler): # given state at the event. This should correctly handle cases # like bans, especially with state res v2. - state_sets_d = await self.state_store.get_state_groups( + state_sets_d = await self._state_store.get_state_groups( event.room_id, extrem_ids ) state_sets: List[Iterable[EventBase]] = list(state_sets_d.values()) state_sets.append(state) - current_states = await self.state_handler.resolve_events( + current_states = await self._state_handler.resolve_events( room_version, state_sets, event ) current_state_ids: StateMap[str] = { k: e.event_id for k, e in current_states.items() } else: - current_state_ids = await self.state_handler.get_current_state_ids( + current_state_ids = await self._state_handler.get_current_state_ids( event.room_id, latest_event_ids=extrem_ids ) @@ -1410,7 +1428,7 @@ class FederationEventHandler(BaseHandler): e for k, e in current_state_ids.items() if k in auth_types ] - auth_events_map = await self.store.get_events(current_state_ids_list) + auth_events_map = await self._store.get_events(current_state_ids_list) current_auth_events = { (e.type, e.state_key): e for e in auth_events_map.values() } @@ -1481,7 +1499,9 @@ class FederationEventHandler(BaseHandler): # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - have_events = await self.store.have_seen_events(event.room_id, missing_auth) + have_events = await self._store.have_seen_events( + event.room_id, missing_auth + ) logger.debug("Events %s are in the store", have_events) missing_auth.difference_update(have_events) @@ -1490,7 +1510,7 @@ class FederationEventHandler(BaseHandler): logger.info("auth_events contains unknown events: %s", missing_auth) try: try: - remote_auth_chain = await self.federation_client.get_event_auth( + remote_auth_chain = await self._federation_client.get_event_auth( origin, event.room_id, event.event_id ) except RequestSendFailed as e1: @@ -1499,7 +1519,7 @@ class FederationEventHandler(BaseHandler): logger.info("Failed to get event auth from remote: %s", e1) return context, auth_events - seen_remotes = await self.store.have_seen_events( + seen_remotes = await self._store.have_seen_events( event.room_id, [e.event_id for e in remote_auth_chain] ) @@ -1525,7 +1545,7 @@ class FederationEventHandler(BaseHandler): e.event_id, ) missing_auth_event_context = ( - await self.state_handler.compute_event_context(e) + await self._state_handler.compute_event_context(e) ) await self._auth_and_persist_event( origin, @@ -1566,7 +1586,7 @@ class FederationEventHandler(BaseHandler): # XXX: currently this checks for redactions but I'm not convinced that is # necessary? - different_events = await self.store.get_events_as_list(different_auth) + different_events = await self._store.get_events_as_list(different_auth) for d in different_events: if d.room_id != event.room_id: @@ -1592,8 +1612,8 @@ class FederationEventHandler(BaseHandler): remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) remote_state = remote_auth_events.values() - room_version = await self.store.get_room_version_id(event.room_id) - new_state = await self.state_handler.resolve_events( + room_version = await self._store.get_room_version_id(event.room_id) + new_state = await self._state_handler.resolve_events( room_version, (local_state, remote_state), event ) @@ -1651,7 +1671,7 @@ class FederationEventHandler(BaseHandler): # create a new state group as a delta from the existing one. prev_group = context.state_group - state_group = await self.state_store.store_state_group( + state_group = await self._state_store.store_state_group( event.event_id, event.room_id, prev_group=prev_group, @@ -1683,9 +1703,9 @@ class FederationEventHandler(BaseHandler): not event.internal_metadata.is_outlier() and not backfilled and not context.rejected - and (await self.store.get_min_depth(event.room_id)) <= event.depth + and (await self._store.get_min_depth(event.room_id)) <= event.depth ): - await self.action_generator.handle_push_actions_for_event( + await self._action_generator.handle_push_actions_for_event( event, context ) @@ -1694,7 +1714,7 @@ class FederationEventHandler(BaseHandler): ) except Exception: run_in_background( - self.store.remove_push_actions_from_staging, event.event_id + self._store.remove_push_actions_from_staging, event.event_id ) raise @@ -1719,27 +1739,27 @@ class FederationEventHandler(BaseHandler): The stream ID after which all events have been persisted. """ if not event_and_contexts: - return self.store.get_current_events_token() + return self._store.get_current_events_token() - instance = self.config.worker.events_shard_config.get_instance(room_id) + instance = self._config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` for batch in batch_iter(event_and_contexts, 200): result = await self._send_events( instance_name=instance, - store=self.store, + store=self._store, room_id=room_id, event_and_contexts=batch, backfilled=backfilled, ) return result["max_stream_id"] else: - assert self.storage.persistence + assert self._storage.persistence # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. - events, max_stream_token = await self.storage.persistence.persist_events( + events, max_stream_token = await self._storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) @@ -1773,7 +1793,7 @@ class FederationEventHandler(BaseHandler): # users if event.internal_metadata.is_outlier(): if event.membership != Membership.INVITE: - if not self.is_mine_id(target_user_id): + if not self._is_mine_id(target_user_id): return target_user = UserID.from_string(target_user_id) @@ -1787,7 +1807,7 @@ class FederationEventHandler(BaseHandler): event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) - self.notifier.on_new_room_event( + self._notifier.on_new_room_event( event, event_pos, max_stream_token, extra_users=extra_users ) @@ -1822,4 +1842,4 @@ class FederationEventHandler(BaseHandler): raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") async def get_min_depth_for_context(self, context: str) -> int: - return await self.store.get_min_depth(context) + return await self._store.get_min_depth(context) |