Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions awx/main/models/credential/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def get_input(self, field_name, **kwargs):
:param field_name(str): The name of the input field.
:param default(optional[str]): A default return value to use.
"""
if self.credential_type.kind != 'external' and field_name in self.dynamic_input_fields:
if field_name in self.dynamic_input_fields:
return self._get_dynamic_input(field_name)
if field_name in self.credential_type.secret_fields:
try:
Expand Down Expand Up @@ -1276,13 +1276,32 @@ class Meta:
metadata = DynamicCredentialInputField(blank=True, default=dict)

def clean_target_credential(self):
if self.target_credential.credential_type.kind == 'external':
raise ValidationError(_('Target must be a non-external credential'))
if self.target_credential == self.source_credential:
raise ValidationError(_('Target and source credentials must be different'))
return self.target_credential
Comment on lines 1278 to 1281
Copy link

Copilot AI Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes API validation semantics: CredentialInputSource.clean_target_credential no longer rejects external target credentials, so existing functional tests (e.g. test_create_credential_input_source_with_external_target_returns_400) and any clients relying on the old error will break. Please update/replace the test expectation and add coverage for the newly-supported external→external input sourcing behavior.

Copilot uses AI. Check for mistakes.

def clean_source_credential(self):
if self.source_credential.credential_type.kind != 'external':
raise ValidationError(_('Source must be an external credential'))

# Cycle detection: check if adding this link would create a cycle
visited = set()
def has_cycle(cred_id, target_id):
if cred_id == target_id:
return True
if cred_id in visited:
return False
visited.add(cred_id)

# Check all sources that point to this credential
for input_source in CredentialInputSource.objects.filter(target_credential_id=cred_id):
if has_cycle(input_source.source_credential_id, target_id):
return True
return False

if has_cycle(self.source_credential_id, self.target_credential_id):
raise ValidationError(_('This would create a circular dependency between credentials'))

return self.source_credential

def clean_input_field_name(self):
Expand All @@ -1300,6 +1319,12 @@ def get_input_value(self):
else:
backend_kwargs[field_name] = value

# Resolve any dynamic inputs on the source credential
# (e.g., a field sourced from another external credential like Azure KV)
for field_name in self.source_credential.dynamic_input_fields:
if field_name not in backend_kwargs:
backend_kwargs[field_name] = self.source_credential._get_dynamic_input(field_name)

backend_kwargs.update(self.metadata)

with set_environ(**settings.AWX_TASK_ENV):
Expand Down
119 changes: 116 additions & 3 deletions awx/main/tests/functional/api/test_credential_input_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_create_from_list(get, post, admin, vault_credential, external_credentia


@pytest.mark.django_db
def test_create_credential_input_source_with_external_target_returns_400(post, admin, external_credential, other_external_credential):
def test_create_credential_input_source_with_external_target_returns_201(post, admin, external_credential, other_external_credential):
list_url = reverse(
'api:credential_input_source_list',
)
Expand All @@ -95,8 +95,7 @@ def test_create_credential_input_source_with_external_target_returns_400(post, a
'metadata': {'key': 'some_key'},
}
response = post(list_url, params, admin)
assert response.status_code == 400
assert response.data['target_credential'] == ['Target must be a non-external credential']
assert response.status_code == 201


@pytest.mark.django_db
Expand Down Expand Up @@ -316,3 +315,117 @@ def test_create_credential_input_source_with_already_used_input_returns_400(post
]
all_responses = [post(list_url, params, admin) for params in all_params]
assert all_responses.pop().status_code == 400


@pytest.mark.django_db
def test_create_credential_input_source_same_target_and_source_returns_400(post, admin, external_credential):
"""Target and source credential cannot be the same credential."""
list_url = reverse('api:credential_input_source_list')
params = {
'target_credential': external_credential.pk,
'source_credential': external_credential.pk,
'input_field_name': 'token',
'metadata': {'key': 'some_key'},
}
response = post(list_url, params, admin)
assert response.status_code == 400
assert b'Target and source credentials must be different' in response.content


@pytest.mark.django_db
def test_create_credential_input_source_circular_dependency_returns_400(post, admin, credentialtype_external, external_credential, other_external_credential):
"""Creating an input source that would form a cycle between credentials should be rejected."""
from awx.main.models import Credential

third_external = Credential.objects.create(
credential_type=credentialtype_external,
name='third-external-cred',
inputs={'url': 'http://thirdhost.com', 'token': 'secret3'},
)
list_url = reverse('api:credential_input_source_list')

# Set up chain: external_credential -> other_external_credential -> third_external
response = post(
list_url,
{
'target_credential': external_credential.pk,
'source_credential': other_external_credential.pk,
'input_field_name': 'token',
'metadata': {'key': 'key1'},
},
admin,
)
assert response.status_code == 201

response = post(
list_url,
{
'target_credential': other_external_credential.pk,
'source_credential': third_external.pk,
'input_field_name': 'token',
'metadata': {'key': 'key2'},
},
admin,
)
assert response.status_code == 201

# Now try to close the cycle: third_external sourced from external_credential
response = post(
list_url,
{
'target_credential': third_external.pk,
'source_credential': external_credential.pk,
'input_field_name': 'token',
'metadata': {'key': 'key3'},
},
admin,
)
assert response.status_code == 400
assert b'circular dependency' in response.content


@pytest.mark.django_db
def test_external_credential_get_input_resolves_dynamic_input(external_credential, other_external_credential):
"""get_input on any credential type should resolve dynamic (externally-sourced) inputs."""
input_source = CredentialInputSource.objects.create(
target_credential=external_credential,
source_credential=other_external_credential,
input_field_name='token',
metadata={'key': 'some_key'},
)
# The mock backend returns 'secret1', so the dynamic input should resolve
assert external_credential.get_input('token') == 'secret1'


@pytest.mark.django_db
def test_chained_external_credential_resolves_source_dynamic_inputs(credentialtype_external, external_credential, other_external_credential):
"""When a source credential itself has dynamic inputs, get_input_value should resolve them."""
# Create a third external credential to act as the leaf source
from awx.main.models import Credential

third_external = Credential.objects.create(
credential_type=credentialtype_external,
name='third-external-cred',
inputs={'url': 'http://thirdhost.com', 'token': 'secret3'},
)

# Chain: other_external_credential.token is sourced from third_external
CredentialInputSource.objects.create(
target_credential=other_external_credential,
source_credential=third_external,
input_field_name='token',
metadata={'key': 'chained_key'},
)

# Now create an input source where other_external_credential is the source
input_source = CredentialInputSource.objects.create(
target_credential=external_credential,
source_credential=other_external_credential,
input_field_name='token',
metadata={'key': 'final_key'},
)

# get_input_value should succeed — the source credential's dynamic 'token'
# field should be resolved before being passed to the backend
result = input_source.get_input_value()
assert result == 'secret'
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
import { PficonHistoryIcon } from '@patternfly/react-icons';
import { PasswordInput } from 'components/FormField';
import AnsibleSelect from 'components/AnsibleSelect';
import Popover from 'components/Popover';
import { CredentialType } from 'types';
import { required } from 'util/validators';
import { CredentialPluginField } from '../CredentialPlugins';
Expand Down Expand Up @@ -220,32 +219,6 @@ function CredentialField({ credentialType, fieldOptions }) {
</FormGroup>
);
}
if (credentialType.kind === 'external') {
return (
<FormGroup
fieldId={`credential-${fieldOptions.id}`}
helperTextInvalid={meta.error}
label={fieldOptions.label}
labelIcon={
fieldOptions.help_text && <Popover content={fieldOptions.help_text} />
}
isRequired={isRequired}
validated={isValid ? 'default' : 'error'}
>
<CredentialInput
credentialKind={credentialType.kind}
fieldOptions={fieldOptions}
isDisabled={
!!(
meta.initialValue &&
meta.initialValue !== '' &&
meta.value === meta.initialValue
)
}
/>
</FormGroup>
);
}
if (credentialType.kind === 'ssh' && fieldOptions.id === 'become_method') {
return (
<BecomeMethodField fieldOptions={fieldOptions} isRequired={isRequired} />
Expand Down