Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Custom CA for certificate issuing #438

Merged
merged 38 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions confidant/routes/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from confidant import authnz, settings
from confidant.services import certificatemanager
from confidant.services.certificate_authority.certificateauthoritybase import (
CertificateAuthorityNotFoundError,
CertificateNotReadyError,
)
from confidant.schema.certificates import (
certificate_authority_response_schema,
certificate_authorities_response_schema,
Expand Down Expand Up @@ -69,7 +73,7 @@ def get_certificate(ca, cn):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404
san = request.args.getlist('san')

Expand All @@ -83,8 +87,9 @@ def get_certificate(ca, cn):
'san': san,
},
):
msg = ('{} does not have access to get certificate cn {} against'
' ca {}').format(
msg = (
'{} does not have access to get certificate cn {} against' ' ca {}'
).format(
authnz.get_logged_in_user(),
cn,
ca,
Expand All @@ -93,11 +98,10 @@ def get_certificate(ca, cn):
return jsonify(error_msg), 403

logger.info(
'get_certificate called on id={} for ca={} by user={}'.format(
cn,
ca,
logged_in_user,
)
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

validity = request.args.get(
Expand All @@ -111,7 +115,7 @@ def get_certificate(ca, cn):
validity,
san,
)
except certificatemanager.CertificateNotReadyError:
except CertificateNotReadyError:
# Ratelimit response for a locked certificate in the cache
error_msg = 'Certificate being requested, please wait and try again.'
response = jsonify(error_msg)
Expand Down Expand Up @@ -173,13 +177,17 @@ def get_certificate_from_csr(ca):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404
data = request.get_json()

if not data or not data.get('csr'):
return jsonify(
{'error': 'csr must be provided in the POST body.'},
), 400
return (
jsonify(
{'error': 'csr must be provided in the POST body.'},
),
400,
)
validity = data.get(
'validity',
ca_object.settings['max_validity_days'],
Expand All @@ -188,9 +196,12 @@ def get_certificate_from_csr(ca):
csr = ca_object.decode_csr(data['csr'])
except Exception:
logger.exception('Failed to decode PEM csr')
return jsonify(
{'error': 'csr could not be decoded'},
), 400
return (
jsonify(
{'error': 'csr could not be decoded'},
),
400,
)
# Get the cn and san values from the csr object, so that we can use them
# for the ACL check.
cn = ca_object.get_csr_common_name(csr)
Expand All @@ -206,28 +217,24 @@ def get_certificate_from_csr(ca):
'san': san,
},
):
msg = ('{} does not have access to get certificate cn {} against'
' ca {}').format(
authnz.get_logged_in_user(),
cn,
ca,
msg = (
f'{authnz.get_logged_in_user()} does not have access to get'
'certificate cn {cn} against ca {ca}'
)
error_msg = {'error': msg, 'reference': cn}
return jsonify(error_msg), 403

logger.info(
'get_certificate called on id={} for ca={} by user={}'.format(
cn,
ca,
logged_in_user,
)
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

arn = ca_object.issue_certificate(data['csr'], validity)
certificate = ca_object.get_certificate_from_arn(arn)
certificate_json = ca_object.issue_certificate(data['csr'], validity)
certificate_response = CertificateResponse(
certificate=certificate['certificate'],
certificate_chain=certificate['certificate_chain'],
certificate=certificate_json['certificate'],
certificate_chain=certificate_json['certificate_chain'],
)
return certificate_response_schema.dumps(certificate_response)

Expand Down Expand Up @@ -284,7 +291,7 @@ def list_cas():

cas = certificatemanager.list_cas()

logger.info('list_cas called by user={}'.format(logged_in_user))
logger.info('list_cas called by user=%s', logged_in_user)

cas_response = CertificateAuthoritiesResponse.from_cas(cas)
return certificate_authorities_response_schema.dumps(cas_response)
Expand Down Expand Up @@ -331,29 +338,23 @@ def get_ca(ca):
'''
try:
ca_object = certificatemanager.get_ca(ca)
except certificatemanager.CertificateAuthorityNotFoundError:
except CertificateAuthorityNotFoundError:
return jsonify({'error': 'Provided CA not found.'}), 404

logged_in_user = authnz.get_logged_in_user()

if not acl_module_check(
resource_type='ca',
action='get',
resource_id=ca,
):
msg = '{} does not have access to get ca {}'.format(
authnz.get_logged_in_user(),
ca,
)
msg = f'''
{authnz.get_logged_in_user()} does not have access to get ca {ca}
'''
error_msg = {'error': msg, 'reference': ca}
return jsonify(error_msg), 403

logger.info(
'get_ca called on id={} by user={}'.format(
ca,
logged_in_user,
)
)

logger.info('get_ca called on id=%s by user=%s', ca, logged_in_user)
_ca = ca_object.get_certificate_authority_certificate()
ca_response = CertificateAuthorityResponse(
ca=_ca['ca'],
Expand Down
Loading
Loading