From e85656611c0172aea7fc990aa16fdf3bdbcc1218 Mon Sep 17 00:00:00 2001 From: Dmitriy Musatkin <63878209+DmitriyMusatkin@users.noreply.github.com> Date: Tue, 19 Nov 2024 11:22:13 -0800 Subject: [PATCH] RSA PKCS 1.5 SHA1 signing support and der helper functions (#609) Co-authored-by: Michael Graeb --- awscrt/crypto.py | 25 +++++++++++- crt/aws-c-cal | 2 +- source/crypto.c | 57 +++++++++++++++++++++++++++ source/crypto.h | 3 ++ source/module.c | 2 + test/test_crypto.py | 95 ++++++++++++++++++++++++++++++++++++++++++--- 6 files changed, 176 insertions(+), 8 deletions(-) diff --git a/awscrt/crypto.py b/awscrt/crypto.py index 35b7027fa..dd344d927 100644 --- a/awscrt/crypto.py +++ b/awscrt/crypto.py @@ -91,7 +91,12 @@ class RSASignatureAlgorithm(IntEnum): PKCSv1.5 padding with sha256 hash function """ - PSS_SHA256 = 1 + PKCS1_5_SHA1 = 1 + """ + PKCSv1.5 padding with sha1 hash function + """ + + PSS_SHA256 = 2 """ PSS padding with sha256 hash function """ @@ -118,6 +123,24 @@ def new_public_key_from_pem_data(pem_data: Union[str, bytes, bytearray, memoryvi """ return RSA(binding=_awscrt.rsa_public_key_from_pem_data(pem_data)) + @staticmethod + def new_private_key_from_der_data(der_data: Union[bytes, bytearray, memoryview]) -> 'RSA': + """ + Creates a new instance of private RSA key pair from der data. + Expects key in PKCS1 format. + Raises ValueError if pem does not have private key object. + """ + return RSA(binding=_awscrt.rsa_private_key_from_der_data(der_data)) + + @staticmethod + def new_public_key_from_der_data(der_data: Union[bytes, bytearray, memoryview]) -> 'RSA': + """ + Creates a new instance of public RSA key pair from der data. + Expects key in PKCS1 format. + Raises ValueError if pem does not have public key object. + """ + return RSA(binding=_awscrt.rsa_public_key_from_der_data(der_data)) + def encrypt(self, encryption_algorithm: RSAEncryptionAlgorithm, plaintext: Union[bytes, bytearray, memoryview]) -> bytes: """ diff --git a/crt/aws-c-cal b/crt/aws-c-cal index 656762aef..fbbe2612a 160000 --- a/crt/aws-c-cal +++ b/crt/aws-c-cal @@ -1 +1 @@ -Subproject commit 656762aefbee2bc8f509cb23cd107abff20a72bb +Subproject commit fbbe2612a3385d1ded02a52d20ad7fd2da4501f4 diff --git a/source/crypto.c b/source/crypto.c index 249e9276f..b996ecbd3 100644 --- a/source/crypto.c +++ b/source/crypto.c @@ -8,6 +8,7 @@ #include "aws/cal/hash.h" #include "aws/cal/hmac.h" #include "aws/cal/rsa.h" +#include "aws/common/encoding.h" #include "aws/io/pem.h" const char *s_capsule_name_hash = "aws_hash"; @@ -350,6 +351,62 @@ PyObject *aws_py_rsa_public_key_from_pem_data(PyObject *self, PyObject *args) { return capsule; } +PyObject *aws_py_rsa_private_key_from_der_data(PyObject *self, PyObject *args) { + (void)self; + + struct aws_byte_cursor der_data_cur; + if (!PyArg_ParseTuple(args, "y#", &der_data_cur.ptr, &der_data_cur.len)) { + return NULL; + } + + PyObject *capsule = NULL; + struct aws_allocator *allocator = aws_py_get_allocator(); + + struct aws_rsa_key_pair *key_pair = aws_rsa_key_pair_new_from_private_key_pkcs1(allocator, der_data_cur); + + if (key_pair == NULL) { + PyErr_AwsLastError(); + goto on_done; + } + + capsule = PyCapsule_New(key_pair, s_capsule_name_rsa, s_rsa_destructor); + + if (capsule == NULL) { + aws_rsa_key_pair_release(key_pair); + } + +on_done: + return capsule; +} + +PyObject *aws_py_rsa_public_key_from_der_data(PyObject *self, PyObject *args) { + (void)self; + + struct aws_byte_cursor der_data_cur; + if (!PyArg_ParseTuple(args, "y#", &der_data_cur.ptr, &der_data_cur.len)) { + return NULL; + } + + PyObject *capsule = NULL; + struct aws_allocator *allocator = aws_py_get_allocator(); + + struct aws_rsa_key_pair *key_pair = aws_rsa_key_pair_new_from_public_key_pkcs1(allocator, der_data_cur); + + if (key_pair == NULL) { + PyErr_AwsLastError(); + goto on_done; + } + + capsule = PyCapsule_New(key_pair, s_capsule_name_rsa, s_rsa_destructor); + + if (capsule == NULL) { + aws_rsa_key_pair_release(key_pair); + } + +on_done: + return capsule; +} + PyObject *aws_py_rsa_encrypt(PyObject *self, PyObject *args) { (void)self; diff --git a/source/crypto.h b/source/crypto.h index 4c03e65a4..3e8db5f74 100644 --- a/source/crypto.h +++ b/source/crypto.h @@ -32,6 +32,9 @@ PyObject *aws_py_sha256_hmac_compute(PyObject *self, PyObject *args); PyObject *aws_py_rsa_private_key_from_pem_data(PyObject *self, PyObject *args); PyObject *aws_py_rsa_public_key_from_pem_data(PyObject *self, PyObject *args); +PyObject *aws_py_rsa_private_key_from_der_data(PyObject *self, PyObject *args); +PyObject *aws_py_rsa_public_key_from_der_data(PyObject *self, PyObject *args); + PyObject *aws_py_rsa_encrypt(PyObject *self, PyObject *args); PyObject *aws_py_rsa_decrypt(PyObject *self, PyObject *args); PyObject *aws_py_rsa_sign(PyObject *self, PyObject *args); diff --git a/source/module.c b/source/module.c index 197ce20f1..293ebca30 100644 --- a/source/module.c +++ b/source/module.c @@ -755,6 +755,8 @@ static PyMethodDef s_module_methods[] = { /* RSA crypto primitives */ AWS_PY_METHOD_DEF(rsa_private_key_from_pem_data, METH_VARARGS), AWS_PY_METHOD_DEF(rsa_public_key_from_pem_data, METH_VARARGS), + AWS_PY_METHOD_DEF(rsa_private_key_from_der_data, METH_VARARGS), + AWS_PY_METHOD_DEF(rsa_public_key_from_der_data, METH_VARARGS), AWS_PY_METHOD_DEF(rsa_encrypt, METH_VARARGS), AWS_PY_METHOD_DEF(rsa_decrypt, METH_VARARGS), AWS_PY_METHOD_DEF(rsa_sign, METH_VARARGS), diff --git a/test/test_crypto.py b/test/test_crypto.py index 7c74a6335..008b1645b 100644 --- a/test/test_crypto.py +++ b/test/test_crypto.py @@ -4,6 +4,7 @@ from test import NativeResourceTest from awscrt.crypto import Hash, RSA, RSAEncryptionAlgorithm, RSASignatureAlgorithm +import base64 import unittest RSA_PRIVATE_KEY_PEM = """ @@ -47,6 +48,41 @@ -----END RSA PUBLIC KEY----- """ +RSA_PUBLIC_KEY_DER_BASE64 = ( + 'MIIBCgKCAQEAxaEsLWE2t3kJqsF1sFHYk7rSCGfGTSDa+3r5typT0cb/TtJ989C8' + 'dLcfInx4Dxq0ewo6NOxQ/TD8JevUda86jSh1UKEQUOl7qy+QwOhFMpwHq/uOgMy5' + 'khDDLlkxD5U32RrDfqLK+4WUDapHlQ6g+E6wS1j1yDRoTZJk3WnTpR0sJHsttLWV' + '+mb2wPC7TkhGMbFMzbt6v0ahF7abVOOGiHVZ77uhS66hgP9nfgMHug8EN/xmVc/T' + 'xgMJci1Irh66xVZQ9aT2OZwb0TXglULm+b8HM+GKHgoTMwr9gAGpFDoYi22PvxC/' + 'cqKHKIaYw7KNOPwImzQ6cp5oQJTAPQKRUwIDAQAB') + +RSA_PRIVATE_KEY_DER_BASE64 = ( + 'MIIEowIBAAKCAQEAxaEsLWE2t3kJqsF1sFHYk7rSCGfGTSDa+3r5typT0cb/TtJ9' + '89C8dLcfInx4Dxq0ewo6NOxQ/TD8JevUda86jSh1UKEQUOl7qy+QwOhFMpwHq/uO' + 'gMy5khDDLlkxD5U32RrDfqLK+4WUDapHlQ6g+E6wS1j1yDRoTZJk3WnTpR0sJHst' + 'tLWV+mb2wPC7TkhGMbFMzbt6v0ahF7abVOOGiHVZ77uhS66hgP9nfgMHug8EN/xm' + 'Vc/TxgMJci1Irh66xVZQ9aT2OZwb0TXglULm+b8HM+GKHgoTMwr9gAGpFDoYi22P' + 'vxC/cqKHKIaYw7KNOPwImzQ6cp5oQJTAPQKRUwIDAQABAoIBACcuUfTZPiDX1UvO' + 'OQfw4hA/zJ4v/MeTyPZspg9jS+TeIAW/g4sQChzVpU2QAbl04O031NxjMZdQ29yk' + 'yaVfTStpJwEKPZLdB1CkCH3GTtm+x2KYZ+MvM2c6/Yc11Z0yRzU6siFsIvQEwpqG' + '9NQfZ1hzOU5m36uGgFtIt8iRz4z/RxpZUOXpaEosb0uMK3VPBuZBu8uVQBFdyAA7' + 'xAGtJphxQ5u0Ct9aidPjD7MhCVzcb2XbgCgxb2hbCmDMOgeNVYrTo2fdBzNxLcXv' + 'j4sUNmO+mLbUMFOePuP8JZaGNTTmznZkavskozfdbubuS3/4/0HH1goytFheVt1B' + 'vfxzpgkCgYEA9QgEMKny0knDHV7BC2uAd7Vvd+5iikA3WdJ9i11zas9AbMMmf9cX' + 'E3xNt6DO42hnVCNN4uAWH5uGWltWZ8pmGKk6mesqZfYPsyTz1cK6fP6KyQrkWRNT' + 'V3nRMEMbziAWxFD5hxP9p1KlqI2Py+W4fJ0LGZ4Mwvn3dKYOilxK+50CgYEAznny' + 'ZxQiJGt8/FtH9f/GDIY24Cz53Cuj+BWG2EH4kLo24ET2QTVvohFJVCm3Hf8Qe4cA' + 'ASabRUg1vS4Tr2FmIqD2Iw/ogSmDcJdYuwhdtWKa8fDbehCN5hmXjn2WKYvjvZNv' + 'Gcx6gfqULD9SaQv+N7lL8eJxKiLLBeVYD7qoha8CgYA8udnf/Z5yQ1mZw8vv+pqC' + 'EHMps+iz/qo5FpOKoIRkKiz7R3oZIMNVTu8r3Syo600Aayd4XLTe7HplllFZs62N' + '2xLs5n1Be7P0X+oWRgZVx/e5T3u8H6/98/DGFzui4A0EZlURBwFMII1xsnO6wpnw' + 'ODNyC9t5zt1nCWh9HdZveQKBgAm4+E8eRZVNcm83pSXSS3Mfhsn7lDBn5aqy6Mya' + 'HqhB/H+G/8mGSKFrCvbpl/PTpOUMMFXdiYYzpkQoPUkO3w5WYgC4qQwb9lKA7e6w' + 'sCjwYbduzgbrbKMfJWHSTBXcvnaY0Kx4UnR4Zi3HNYw4wlnBYfAb55RCWykF6aWj' + '9neFAoGBAMqQA2YWCHhnRtjn4iGMrTk8iOHBd8AGBBzX9rPKXDqWlOr/iQq90qX0' + '59309stR/bAhMzxOx31777XEPO1md854iXXr0XDMQlwCYkWyWb6hp4JlsqFBPMjn' + 'nGXWA0Gp6UWgpg4Hvjdsu+0FQ3AhDMBKZZ8fBFb4EW+HRQIHPnbH') + class TestCredentials(NativeResourceTest): @@ -134,21 +170,68 @@ def test_rsa_encryption_roundtrip(self): pt_pub = rsa.decrypt(p, ct_pub) self.assertEqual(test_pt, pt_pub) - def test_rsa_signing_roundtrip(self): - h = Hash.sha256_new() - h.update(b'totally original test string') - digest = h.digest() + def test_rsa_encryption_roundtrip_der(self): + param_list = [RSAEncryptionAlgorithm.PKCS1_5, + RSAEncryptionAlgorithm.OAEP_SHA256, + RSAEncryptionAlgorithm.OAEP_SHA512] + for p in param_list: + with self.subTest(msg="RSA Encryption Roundtrip using algo p", p=p): + test_pt = b'totally original test string' + private_key_der_bytes = base64.b64decode(RSA_PRIVATE_KEY_DER_BASE64) + rsa = RSA.new_private_key_from_der_data(private_key_der_bytes) + ct = rsa.encrypt(p, test_pt) + pt = rsa.decrypt(p, ct) + self.assertEqual(test_pt, pt) + + public_key_der_bytes = base64.b64decode(RSA_PUBLIC_KEY_DER_BASE64) + rsa_pub = RSA.new_public_key_from_der_data(public_key_der_bytes) + ct_pub = rsa_pub.encrypt(p, test_pt) + pt_pub = rsa.decrypt(p, ct_pub) + self.assertEqual(test_pt, pt_pub) + + def test_rsa_signing_roundtrip(self): param_list = [RSASignatureAlgorithm.PKCS1_5_SHA256, - RSASignatureAlgorithm.PSS_SHA256] + RSASignatureAlgorithm.PSS_SHA256, + RSASignatureAlgorithm.PKCS1_5_SHA1] for p in param_list: with self.subTest(msg="RSA Signing Roundtrip using algo p", p=p): + if (p == RSASignatureAlgorithm.PKCS1_5_SHA1): + h = Hash.sha1_new() + else: + h = Hash.sha256_new() + h.update(b'totally original test string') + digest = h.digest() + rsa = RSA.new_private_key_from_pem_data(RSA_PRIVATE_KEY_PEM) signature = rsa.sign(p, digest) self.assertTrue(rsa.verify(p, digest, signature)) - rsa_pub = RSA.new_private_key_from_pem_data(RSA_PRIVATE_KEY_PEM) + rsa_pub = RSA.new_public_key_from_pem_data(RSA_PUBLIC_KEY_PEM) + self.assertTrue(rsa_pub.verify(p, digest, signature)) + + def test_rsa_signing_roundtrip_der(self): + param_list = [RSASignatureAlgorithm.PKCS1_5_SHA256, + RSASignatureAlgorithm.PSS_SHA256, + RSASignatureAlgorithm.PKCS1_5_SHA1] + + for p in param_list: + with self.subTest(msg="RSA Signing Roundtrip using algo p", p=p): + if (p == RSASignatureAlgorithm.PKCS1_5_SHA1): + h = Hash.sha1_new() + else: + h = Hash.sha256_new() + h.update(b'totally original test string') + digest = h.digest() + + private_key_der_bytes = base64.b64decode(RSA_PRIVATE_KEY_DER_BASE64) + rsa = RSA.new_private_key_from_der_data(private_key_der_bytes) + signature = rsa.sign(p, digest) + self.assertTrue(rsa.verify(p, digest, signature)) + + public_key_der_bytes = base64.b64decode(RSA_PUBLIC_KEY_DER_BASE64) + rsa_pub = RSA.new_public_key_from_der_data(public_key_der_bytes) self.assertTrue(rsa_pub.verify(p, digest, signature)) def test_rsa_load_error(self):