diff --git a/src/rust/src/x509/csr.rs b/src/rust/src/x509/csr.rs index 8f7f17ff1a71..c6cf5fe04f91 100644 --- a/src/rust/src/x509/csr.rs +++ b/src/rust/src/x509/csr.rs @@ -29,6 +29,7 @@ self_cell::self_cell!( pub(crate) struct CertificateSigningRequest { raw: OwnedCsr, cached_extensions: pyo3::sync::PyOnceLock>, + cached_attributes: pyo3::sync::PyOnceLock>, } #[pyo3::pymethods] @@ -129,32 +130,40 @@ impl CertificateSigningRequest { #[getter] fn attributes<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult> { - let pyattrs = pyo3::types::PyList::empty(py); - for attribute in self - .raw - .borrow_dependent() - .csr_info - .attributes - .unwrap_read() - .clone() - { - check_attribute_length(attribute.values.unwrap_read().clone()).map_err(|_| { - pyo3::exceptions::PyValueError::new_err( - "Only single-valued attributes are supported", - ) - })?; - let oid = oid_to_py_oid(py, &attribute.type_id)?; - let val = attribute.values.unwrap_read().clone().next().unwrap(); - let serialized = pyo3::types::PyBytes::new(py, val.data()); - let tag = val.tag().as_u8().ok_or_else(|| { - CryptographyError::from(pyo3::exceptions::PyValueError::new_err( - "Long-form tags are not supported in CSR attribute values", - )) - })?; - let pyattr = types::ATTRIBUTE.get(py)?.call1((oid, serialized, tag))?; - pyattrs.append(pyattr)?; - } - types::ATTRIBUTES.get(py)?.call1((pyattrs,)) + Ok(self + .cached_attributes + .get_or_try_init(py, || -> pyo3::PyResult> { + let pyattrs = pyo3::types::PyList::empty(py); + for attribute in self + .raw + .borrow_dependent() + .csr_info + .attributes + .unwrap_read() + .clone() + { + check_attribute_length(attribute.values.unwrap_read().clone()).map_err( + |_| { + pyo3::exceptions::PyValueError::new_err( + "Only single-valued attributes are supported", + ) + }, + )?; + let oid = oid_to_py_oid(py, &attribute.type_id)?; + let val = attribute.values.unwrap_read().clone().next().unwrap(); + let serialized = pyo3::types::PyBytes::new(py, val.data()); + let tag = val.tag().as_u8().ok_or_else(|| { + CryptographyError::from(pyo3::exceptions::PyValueError::new_err( + "Long-form tags are not supported in CSR attribute values", + )) + })?; + let pyattr = types::ATTRIBUTE.get(py)?.call1((oid, serialized, tag))?; + pyattrs.append(pyattr)?; + } + Ok(types::ATTRIBUTES.get(py)?.call1((pyattrs,))?.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -236,6 +245,7 @@ pub(crate) fn load_der_x509_csr( Ok(CertificateSigningRequest { raw, cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_attributes: pyo3::sync::PyOnceLock::new(), }) } diff --git a/tests/bench/test_x509.py b/tests/bench/test_x509.py index abfbbf92a199..9ed4ccf72948 100644 --- a/tests/bench/test_x509.py +++ b/tests/bench/test_x509.py @@ -9,6 +9,7 @@ import certifi from cryptography import x509 +from cryptography.x509 import ocsp from ..utils import load_vectors_from_file @@ -46,6 +47,128 @@ def test_load_pem_certificate(benchmark): benchmark(x509.load_pem_x509_certificate, cert_bytes) +# --------------------------------------------------------------------------- +# Repeated property access — these measure the cost of the cached fast path. +# Each benchmark constructs the object once, then calls the getter repeatedly. +# --------------------------------------------------------------------------- + + +def test_certificate_subject(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.subject) + + +def test_certificate_issuer(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.issuer) + + +def test_certificate_public_key(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.public_key()) + + +def test_certificate_signature_hash_algorithm(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.signature_hash_algorithm) + + +def test_certificate_signature_algorithm_oid(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.signature_algorithm_oid) + + +def test_crl_issuer(benchmark): + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + benchmark(lambda: crl.issuer) + + +def test_crl_serial_number_lookup_hit(benchmark): + """Repeated lookup for a serial number present in the CRL.""" + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + # Serial 1 is always present in this CRL. + benchmark(lambda: crl.get_revoked_certificate_by_serial_number(1)) + + +def test_crl_serial_number_lookup_miss(benchmark): + """Repeated lookup for a serial number absent from the CRL.""" + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + benchmark(lambda: crl.get_revoked_certificate_by_serial_number(99999)) + + +def test_ocsp_request_properties(benchmark): + req_bytes = load_vectors_from_file( + os.path.join("x509", "ocsp", "req-sha1.der"), + loader=lambda f: f.read(), + mode="rb", + ) + req = ocsp.load_der_ocsp_request(req_bytes) + + def bench(): + req.issuer_name_hash + req.issuer_key_hash + req.hash_algorithm + req.serial_number + + benchmark(bench) + + +def test_ocsp_response_properties(benchmark): + resp_bytes = load_vectors_from_file( + os.path.join("x509", "ocsp", "resp-sha256.der"), + loader=lambda f: f.read(), + mode="rb", + ) + resp = ocsp.load_der_ocsp_response(resp_bytes) + + def bench(): + resp.issuer_key_hash + resp.serial_number + resp.signature_hash_algorithm + + benchmark(bench) + + def test_verify_docs_python_org(benchmark, pytestconfig): limbo_root = pytestconfig.getoption("--x509-limbo-root", skip=True) with open(os.path.join(limbo_root, "limbo.json"), "rb") as f: