| #!/usr/bin/env python3 |
| |
| """ |
| Generates test cases that aim to validate name constraints and other |
| name-related parts of webpki. |
| |
| Run this script from tests/. It edits the bottom part of some .rs files and |
| drops testcase data into subdirectories as required. |
| """ |
| import argparse |
| import os |
| from typing import TextIO, Optional, Union, Any, Callable, Iterable, List |
| from pathlib import Path |
| |
| from cryptography import x509 |
| from cryptography.hazmat.primitives import hashes |
| from cryptography.hazmat.primitives.asymmetric import rsa, ec, ed25519, padding |
| from cryptography.hazmat.primitives.serialization import Encoding |
| from cryptography.hazmat.backends import default_backend |
| from cryptography.x509.oid import NameOID, ExtendedKeyUsageOID |
| import ipaddress |
| import datetime |
| import subprocess |
| |
| ROOT_PRIVATE_KEY: rsa.RSAPrivateKey = rsa.generate_private_key( |
| public_exponent=65537, key_size=2048, backend=default_backend() |
| ) |
| ROOT_PUBLIC_KEY: rsa.RSAPublicKey = ROOT_PRIVATE_KEY.public_key() |
| |
| NOT_BEFORE: datetime.datetime = datetime.datetime.utcfromtimestamp(0x1FEDF00D - 30) |
| NOT_AFTER: datetime.datetime = datetime.datetime.utcfromtimestamp(0x1FEDF00D + 30) |
| |
| ANY_PRIV_KEY = Union[ |
| ed25519.Ed25519PrivateKey | ec.EllipticCurvePrivateKey | rsa.RSAPrivateKey |
| ] |
| ANY_PUB_KEY = Union[ |
| ed25519.Ed25519PublicKey | ec.EllipticCurvePublicKey | rsa.RSAPublicKey |
| ] |
| SIGNER = Callable[ |
| [Any, bytes], Any |
| ] # Note: a bit loosey-goosey here but good enough for tests. |
| |
| |
| def trim_top(file_name: str) -> TextIO: |
| """ |
| Reads `file_name`, then writes lines up to a particular comment (the "top" |
| of the file) back to it and returns the file object for further writing. |
| """ |
| |
| with open(file_name, "r") as f: |
| top = f.readlines() |
| top = top[: top.index("// DO NOT EDIT BELOW: generated by tests/generate.py\n") + 1] |
| output = open(file_name, "w") |
| for line in top: |
| output.write(line) |
| return output |
| |
| |
| def key_or_generate(key: Optional[ANY_PRIV_KEY] = None) -> ANY_PRIV_KEY: |
| return ( |
| key |
| if key is not None |
| else rsa.generate_private_key( |
| public_exponent=65537, |
| key_size=2048, |
| backend=default_backend(), |
| ) |
| ) |
| |
| |
| def write_der(path: str, content: bytes, force: bool) -> None: |
| # Avoid churn from regenerating existing on-disk resources unless force is enabled. |
| out_path = Path(path) |
| if out_path.exists() and not force: |
| return None |
| |
| with out_path.open("wb") as f: |
| f.write(content) |
| |
| |
| def end_entity_cert( |
| *, |
| subject_name: x509.Name, |
| issuer_name: x509.Name, |
| issuer_key: Optional[ANY_PRIV_KEY] = None, |
| subject_key: Optional[ANY_PRIV_KEY] = None, |
| sans: Optional[Iterable[x509.GeneralName]] = None, |
| ekus: Optional[Iterable[x509.ObjectIdentifier]] = None, |
| serial: Optional[int] = None, |
| ) -> x509.Certificate: |
| subject_priv_key = key_or_generate(subject_key) |
| subject_key_pub: ANY_PUB_KEY = subject_priv_key.public_key() |
| |
| ee_builder: x509.CertificateBuilder = x509.CertificateBuilder() |
| ee_builder = ee_builder.subject_name(subject_name) |
| ee_builder = ee_builder.issuer_name(issuer_name) |
| ee_builder = ee_builder.not_valid_before(NOT_BEFORE) |
| ee_builder = ee_builder.not_valid_after(NOT_AFTER) |
| ee_builder = ee_builder.serial_number( |
| x509.random_serial_number() if serial is None else serial |
| ) |
| ee_builder = ee_builder.public_key(subject_key_pub) |
| if sans: |
| ee_builder = ee_builder.add_extension( |
| x509.SubjectAlternativeName(sans), critical=False |
| ) |
| if ekus: |
| ee_builder = ee_builder.add_extension( |
| x509.ExtendedKeyUsage(ekus), critical=False |
| ) |
| ee_builder = ee_builder.add_extension( |
| x509.BasicConstraints(ca=False, path_length=None), |
| critical=True, |
| ) |
| return ee_builder.sign( |
| private_key=issuer_key if issuer_key is not None else ROOT_PRIVATE_KEY, |
| algorithm=hashes.SHA256(), |
| backend=default_backend(), |
| ) |
| |
| |
| def subject_name_for_test(subject_cn: str, test_name: str) -> x509.Name: |
| return x509.Name( |
| [ |
| x509.NameAttribute(NameOID.COMMON_NAME, subject_cn), |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name), |
| ] |
| ) |
| |
| |
| def issuer_name_for_test(test_name: str) -> x509.Name: |
| return x509.Name( |
| [ |
| x509.NameAttribute(NameOID.COMMON_NAME, "issuer.example.com"), |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name), |
| ] |
| ) |
| |
| |
| def ca_cert( |
| *, |
| subject_name: x509.Name, |
| subject_key: Optional[ANY_PRIV_KEY] = None, |
| issuer_name: Optional[x509.Name] = None, |
| issuer_key: Optional[ANY_PRIV_KEY] = None, |
| permitted_subtrees: Optional[Iterable[x509.GeneralName]] = None, |
| excluded_subtrees: Optional[Iterable[x509.GeneralName]] = None, |
| key_usage: Optional[x509.KeyUsage] = None, |
| ) -> x509.Certificate: |
| subject_priv_key = key_or_generate(subject_key) |
| subject_key_pub: ANY_PUB_KEY = subject_priv_key.public_key() |
| |
| ca_builder: x509.CertificateBuilder = x509.CertificateBuilder() |
| ca_builder = ca_builder.subject_name(subject_name) |
| ca_builder = ca_builder.issuer_name(issuer_name if issuer_name else subject_name) |
| ca_builder = ca_builder.not_valid_before(NOT_BEFORE) |
| ca_builder = ca_builder.not_valid_after(NOT_AFTER) |
| ca_builder = ca_builder.serial_number(x509.random_serial_number()) |
| ca_builder = ca_builder.public_key(subject_key_pub) |
| ca_builder = ca_builder.add_extension( |
| x509.BasicConstraints(ca=True, path_length=None), |
| critical=True, |
| ) |
| if permitted_subtrees is not None or excluded_subtrees is not None: |
| ca_builder = ca_builder.add_extension( |
| x509.NameConstraints(permitted_subtrees, excluded_subtrees), critical=True |
| ) |
| if key_usage is not None: |
| ca_builder = ca_builder.add_extension( |
| key_usage, |
| critical=True, |
| ) |
| |
| return ca_builder.sign( |
| private_key=issuer_key if issuer_key else subject_priv_key, |
| algorithm=hashes.SHA256(), |
| backend=default_backend(), |
| ) |
| |
| |
| def generate_tls_server_cert_test( |
| output: TextIO, |
| test_name: str, |
| expected_error: Optional[str] = None, |
| subject_common_name: Optional[str] = None, |
| extra_subject_names: Optional[List[x509.NameAttribute]] = None, |
| valid_names: Optional[List[str]] = None, |
| invalid_names: Optional[List[str]] = None, |
| sans: Optional[Iterable[x509.GeneralName]] = None, |
| permitted_subtrees: Optional[Iterable[x509.GeneralName]] = None, |
| excluded_subtrees: Optional[Iterable[x509.GeneralName]] = None, |
| force: bool = False, |
| ) -> None: |
| """ |
| Generate a test case, writing a rust '#[test]' function into |
| tls_server_certs.rs, and writing supporting files into the current |
| directory. |
| |
| - `test_name`: name of the test, must be a rust identifier. |
| - `expected_error`: item in `webpki::Error` enum, expected error from |
| webpki `verify_is_valid_tls_server_cert` function. Leave absent to |
| expect success. |
| - `subject_common_name`: optional string to put in end-entity certificate |
| subject common name. |
| - `extra_subject_names`: optional sequence of `x509.NameAttributes` to add |
| to end-entity certificate subject. |
| - `valid_names`: optional sequence of valid names that the end-entity |
| certificate is expected to pass `verify_is_valid_for_subject_name` for. |
| - `invalid_names`: optional sequence of invalid names that the end-entity |
| certificate is expected to fail `verify_is_valid_for_subject_name` with |
| `CertNotValidForName`. |
| - `sans`: optional sequence of `x509.GeneralName`s that are the contents of |
| the subjectAltNames extension. If empty or not provided the end-entity |
| certificate does not have a subjectAltName extension. |
| - `permitted_subtrees`: optional sequence of `x509.GeneralName`s that are |
| the `permittedSubtrees` contents of the `nameConstraints` extension. |
| If this and `excluded_subtrees` are empty/absent then the end-entity |
| certificate does not have a `nameConstraints` extension. |
| - `excluded_subtrees`: optional sequence of `x509.GeneralName`s that are |
| the `excludedSubtrees` contents of the `nameConstraints` extension. |
| If this and `permitted_subtrees` are both empty/absent then the |
| end-entity certificate does not have a `nameConstraints` extension. |
| """ |
| |
| if invalid_names is None: |
| invalid_names = [] |
| if valid_names is None: |
| valid_names = [] |
| if extra_subject_names is None: |
| extra_subject_names = [] |
| |
| issuer_name: x509.Name = issuer_name_for_test(test_name) |
| |
| # end-entity |
| ee_subject = x509.Name( |
| ( |
| [x509.NameAttribute(NameOID.COMMON_NAME, subject_common_name)] |
| if subject_common_name |
| else [] |
| ) |
| + [x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name)] |
| + extra_subject_names |
| ) |
| ee_certificate: x509.Certificate = end_entity_cert( |
| subject_name=ee_subject, |
| issuer_name=issuer_name, |
| sans=sans, |
| ) |
| |
| output_dir: str = "tls_server_certs" |
| ee_cert_path: str = os.path.join(output_dir, f"{test_name}.ee.der") |
| ca_cert_path: str = os.path.join(output_dir, f"{test_name}.ca.der") |
| |
| if not os.path.isdir(output_dir): |
| os.mkdir(output_dir) |
| |
| write_der(ee_cert_path, ee_certificate.public_bytes(Encoding.DER), force) |
| |
| # issuer |
| ca: x509.Certificate = ca_cert( |
| subject_name=issuer_name, |
| subject_key=ROOT_PRIVATE_KEY, |
| permitted_subtrees=permitted_subtrees, |
| excluded_subtrees=excluded_subtrees, |
| ) |
| |
| write_der(ca_cert_path, ca.public_bytes(Encoding.DER), force) |
| |
| expected: str = "" |
| if expected_error is None: |
| expected = "Ok(())" |
| else: |
| expected = "Err(webpki::Error::" + expected_error + ")" |
| |
| valid_names_str: str = ", ".join('"' + name + '"' for name in valid_names) |
| invalid_names_str: str = ", ".join('"' + name + '"' for name in invalid_names) |
| |
| print( |
| """ |
| #[test] |
| fn %(test_name)s() { |
| let ee = include_bytes!("%(ee_cert_path)s"); |
| let ca = include_bytes!("%(ca_cert_path)s"); |
| assert_eq!( |
| check_cert(ee, ca, &[%(valid_names_str)s], &[%(invalid_names_str)s]), |
| %(expected)s |
| ); |
| }""" |
| % locals(), |
| file=output, |
| ) |
| |
| |
| def tls_server_certs(force: bool) -> None: |
| with trim_top("tls_server_certs.rs") as output: |
| generate_tls_server_cert_test( |
| output, |
| "no_name_constraints", |
| subject_common_name="subject.example.com", |
| valid_names=["dns.example.com"], |
| invalid_names=["subject.example.com"], |
| sans=[x509.DNSName("dns.example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "additional_dns_labels", |
| subject_common_name="subject.example.com", |
| valid_names=["host1.example.com", "host2.example.com"], |
| invalid_names=["subject.example.com"], |
| sans=[x509.DNSName("host1.example.com"), x509.DNSName("host2.example.com")], |
| permitted_subtrees=[x509.DNSName(".example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "disallow_subject_common_name", |
| expected_error="NameConstraintViolation", |
| subject_common_name="disallowed.example.com", |
| excluded_subtrees=[x509.DNSName("disallowed.example.com")], |
| ) |
| generate_tls_server_cert_test( |
| output, |
| "disallow_dns_san", |
| expected_error="NameConstraintViolation", |
| sans=[x509.DNSName("disallowed.example.com")], |
| excluded_subtrees=[x509.DNSName("disallowed.example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "allow_subject_common_name", |
| subject_common_name="allowed.example.com", |
| invalid_names=["allowed.example.com"], |
| permitted_subtrees=[x509.DNSName("allowed.example.com")], |
| ) |
| generate_tls_server_cert_test( |
| output, |
| "allow_dns_san", |
| valid_names=["allowed.example.com"], |
| sans=[x509.DNSName("allowed.example.com")], |
| permitted_subtrees=[x509.DNSName("allowed.example.com")], |
| ) |
| generate_tls_server_cert_test( |
| output, |
| "allow_dns_san_and_subject_common_name", |
| valid_names=["allowed-san.example.com"], |
| invalid_names=["allowed-cn.example.com"], |
| sans=[x509.DNSName("allowed-san.example.com")], |
| subject_common_name="allowed-cn.example.com", |
| permitted_subtrees=[ |
| x509.DNSName("allowed-san.example.com"), |
| x509.DNSName("allowed-cn.example.com"), |
| ], |
| ) |
| generate_tls_server_cert_test( |
| output, |
| "allow_dns_san_and_disallow_subject_common_name", |
| expected_error="NameConstraintViolation", |
| sans=[x509.DNSName("allowed-san.example.com")], |
| subject_common_name="disallowed-cn.example.com", |
| permitted_subtrees=[x509.DNSName("allowed-san.example.com")], |
| excluded_subtrees=[x509.DNSName("disallowed-cn.example.com")], |
| ) |
| generate_tls_server_cert_test( |
| output, |
| "disallow_dns_san_and_allow_subject_common_name", |
| expected_error="NameConstraintViolation", |
| sans=[ |
| x509.DNSName("allowed-san.example.com"), |
| x509.DNSName("disallowed-san.example.com"), |
| ], |
| subject_common_name="allowed-cn.example.com", |
| permitted_subtrees=[ |
| x509.DNSName("allowed-san.example.com"), |
| x509.DNSName("allowed-cn.example.com"), |
| ], |
| excluded_subtrees=[x509.DNSName("disallowed-san.example.com")], |
| ) |
| |
| # XXX: ideally this test case would be a negative one, because the name constraints |
| # should apply to the subject name. |
| # however, because we don't look at email addresses in subjects, it is accepted. |
| generate_tls_server_cert_test( |
| output, |
| "we_incorrectly_ignore_name_constraints_on_name_in_subject", |
| extra_subject_names=[ |
| x509.NameAttribute(NameOID.EMAIL_ADDRESS, "[email protected]") |
| ], |
| permitted_subtrees=[x509.RFC822Name("example.com")], |
| ) |
| |
| # this does work, however, because we process all SANs |
| generate_tls_server_cert_test( |
| output, |
| "reject_constraints_on_unimplemented_names", |
| expected_error="NameConstraintViolation", |
| sans=[x509.RFC822Name("[email protected]")], |
| permitted_subtrees=[x509.RFC822Name("example.com")], |
| ) |
| |
| # RFC5280 4.2.1.10: |
| # "If no name of the type is in the certificate, |
| # the certificate is acceptable." |
| generate_tls_server_cert_test( |
| output, |
| "we_ignore_constraints_on_names_that_do_not_appear_in_cert", |
| sans=[x509.DNSName("notexample.com")], |
| valid_names=["notexample.com"], |
| invalid_names=["example.com"], |
| permitted_subtrees=[x509.RFC822Name("example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "wildcard_san_accepted_if_in_subtree", |
| sans=[x509.DNSName("*.example.com")], |
| valid_names=["bob.example.com", "jane.example.com"], |
| invalid_names=["example.com", "uh.oh.example.com"], |
| permitted_subtrees=[x509.DNSName("example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "wildcard_san_rejected_if_in_excluded_subtree", |
| expected_error="NameConstraintViolation", |
| sans=[x509.DNSName("*.example.com")], |
| excluded_subtrees=[x509.DNSName("example.com")], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip4_address_san_rejected_if_in_excluded_subtree", |
| expected_error="NameConstraintViolation", |
| sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))], |
| excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.0/24"))], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip4_address_san_allowed_if_outside_excluded_subtree", |
| valid_names=["12.34.56.78"], |
| sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))], |
| excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.252/30"))], |
| ) |
| |
| sparse_net_addr = ipaddress.ip_network("12.34.56.78/24", strict=False) |
| sparse_net_addr.netmask = ipaddress.ip_address("255.255.255.1") |
| generate_tls_server_cert_test( |
| output, |
| "ip4_address_san_rejected_if_excluded_is_sparse_cidr_mask", |
| expected_error="InvalidNetworkMaskConstraint", |
| sans=[ |
| # inside excluded network, if netmask is allowed to be sparse |
| x509.IPAddress(ipaddress.ip_address("12.34.56.79")), |
| ], |
| excluded_subtrees=[x509.IPAddress(sparse_net_addr)], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip4_address_san_allowed", |
| valid_names=["12.34.56.78"], |
| invalid_names=[ |
| "12.34.56.77", |
| "12.34.56.79", |
| "0000:0000:0000:0000:0000:ffff:0c22:384e", |
| ], |
| sans=[x509.IPAddress(ipaddress.ip_address("12.34.56.78"))], |
| permitted_subtrees=[x509.IPAddress(ipaddress.ip_network("12.34.56.0/24"))], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip6_address_san_rejected_if_in_excluded_subtree", |
| expected_error="NameConstraintViolation", |
| sans=[x509.IPAddress(ipaddress.ip_address("2001:db8::1"))], |
| excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db8::/48"))], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip6_address_san_allowed_if_outside_excluded_subtree", |
| valid_names=["2001:0db9:0000:0000:0000:0000:0000:0001"], |
| sans=[x509.IPAddress(ipaddress.ip_address("2001:db9::1"))], |
| excluded_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db8::/48"))], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip6_address_san_allowed", |
| valid_names=["2001:0db9:0000:0000:0000:0000:0000:0001"], |
| invalid_names=["12.34.56.78"], |
| sans=[x509.IPAddress(ipaddress.ip_address("2001:db9::1"))], |
| permitted_subtrees=[x509.IPAddress(ipaddress.ip_network("2001:db9::/48"))], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "ip46_mixed_address_san_allowed", |
| valid_names=["12.34.56.78", "2001:0db9:0000:0000:0000:0000:0000:0001"], |
| invalid_names=[ |
| "12.34.56.77", |
| "12.34.56.79", |
| "0000:0000:0000:0000:0000:ffff:0c22:384e", |
| ], |
| sans=[ |
| x509.IPAddress(ipaddress.ip_address("12.34.56.78")), |
| x509.IPAddress(ipaddress.ip_address("2001:db9::1")), |
| ], |
| permitted_subtrees=[ |
| x509.IPAddress(ipaddress.ip_network("12.34.56.0/24")), |
| x509.IPAddress(ipaddress.ip_network("2001:db9::/48")), |
| ], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "permit_directory_name_not_implemented", |
| expected_error="NameConstraintViolation", |
| permitted_subtrees=[ |
| x509.DirectoryName( |
| x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "CN")]) |
| ) |
| ], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "exclude_directory_name_not_implemented", |
| expected_error="NameConstraintViolation", |
| excluded_subtrees=[ |
| x509.DirectoryName( |
| x509.Name([x509.NameAttribute(NameOID.COUNTRY_NAME, "CN")]) |
| ) |
| ], |
| ) |
| |
| generate_tls_server_cert_test( |
| output, |
| "invalid_dns_name_matching", |
| valid_names=["dns.example.com"], |
| subject_common_name="subject.example.com", |
| sans=[ |
| x509.DNSName("{invalid}.example.com"), |
| x509.DNSName("dns.example.com"), |
| ], |
| ) |
| |
| |
| def signatures(force: bool) -> None: |
| rsa_pub_exponent: int = 0x10001 |
| backend: Any = default_backend() |
| all_key_types: dict[str, ANY_PRIV_KEY] = { |
| "ed25519": ed25519.Ed25519PrivateKey.generate(), |
| "ecdsa_p256": ec.generate_private_key(ec.SECP256R1(), backend), |
| "ecdsa_p384": ec.generate_private_key(ec.SECP384R1(), backend), |
| "ecdsa_p521_not_supported": ec.generate_private_key(ec.SECP521R1(), backend), |
| "rsa_1024_not_supported": rsa.generate_private_key( |
| rsa_pub_exponent, 1024, backend |
| ), |
| "rsa_2048": rsa.generate_private_key(rsa_pub_exponent, 2048, backend), |
| "rsa_3072": rsa.generate_private_key(rsa_pub_exponent, 3072, backend), |
| "rsa_4096": rsa.generate_private_key(rsa_pub_exponent, 4096, backend), |
| } |
| |
| rsa_types: list[str] = [ |
| "RSA_PKCS1_2048_8192_SHA256", |
| "RSA_PKCS1_2048_8192_SHA384", |
| "RSA_PKCS1_2048_8192_SHA512", |
| "RSA_PSS_2048_8192_SHA256_LEGACY_KEY", |
| "RSA_PSS_2048_8192_SHA384_LEGACY_KEY", |
| "RSA_PSS_2048_8192_SHA512_LEGACY_KEY", |
| ] |
| |
| webpki_algs: dict[str, Iterable[str]] = { |
| "ed25519": ["ED25519"], |
| "ecdsa_p256": ["ECDSA_P256_SHA384", "ECDSA_P256_SHA256"], |
| "ecdsa_p384": ["ECDSA_P384_SHA384", "ECDSA_P384_SHA256"], |
| "rsa_2048": rsa_types, |
| "rsa_3072": rsa_types + ["RSA_PKCS1_3072_8192_SHA384"], |
| "rsa_4096": rsa_types + ["RSA_PKCS1_3072_8192_SHA384"], |
| } |
| |
| pss_sha256: padding.PSS = padding.PSS( |
| mgf=padding.MGF1(hashes.SHA256()), salt_length=32 |
| ) |
| pss_sha384: padding.PSS = padding.PSS( |
| mgf=padding.MGF1(hashes.SHA384()), salt_length=48 |
| ) |
| pss_sha512: padding.PSS = padding.PSS( |
| mgf=padding.MGF1(hashes.SHA512()), salt_length=64 |
| ) |
| |
| how_to_sign: dict[str, SIGNER] = { |
| "ED25519": lambda key, message: key.sign(message), |
| "ECDSA_P256_SHA256": lambda key, message: key.sign( |
| message, ec.ECDSA(hashes.SHA256()) |
| ), |
| "ECDSA_P256_SHA384": lambda key, message: key.sign( |
| message, ec.ECDSA(hashes.SHA384()) |
| ), |
| "ECDSA_P384_SHA256": lambda key, message: key.sign( |
| message, ec.ECDSA(hashes.SHA256()) |
| ), |
| "ECDSA_P384_SHA384": lambda key, message: key.sign( |
| message, ec.ECDSA(hashes.SHA384()) |
| ), |
| "RSA_PKCS1_2048_8192_SHA256": lambda key, message: key.sign( |
| message, padding.PKCS1v15(), hashes.SHA256() |
| ), |
| "RSA_PKCS1_2048_8192_SHA384": lambda key, message: key.sign( |
| message, padding.PKCS1v15(), hashes.SHA384() |
| ), |
| "RSA_PKCS1_2048_8192_SHA512": lambda key, message: key.sign( |
| message, padding.PKCS1v15(), hashes.SHA512() |
| ), |
| "RSA_PKCS1_3072_8192_SHA384": lambda key, message: key.sign( |
| message, padding.PKCS1v15(), hashes.SHA384() |
| ), |
| "RSA_PSS_2048_8192_SHA256_LEGACY_KEY": lambda key, message: key.sign( |
| message, pss_sha256, hashes.SHA256() |
| ), |
| "RSA_PSS_2048_8192_SHA384_LEGACY_KEY": lambda key, message: key.sign( |
| message, pss_sha384, hashes.SHA384() |
| ), |
| "RSA_PSS_2048_8192_SHA512_LEGACY_KEY": lambda key, message: key.sign( |
| message, pss_sha512, hashes.SHA512() |
| ), |
| } |
| |
| output_dir: str = "signatures" |
| if not os.path.isdir(output_dir): |
| os.mkdir(output_dir) |
| |
| message = b"hello world!" |
| message_path: str = os.path.join(output_dir, "message.bin") |
| write_der(message_path, message, force) |
| |
| def _cert_path(cert_type: str) -> str: |
| return os.path.join(output_dir, f"{cert_type}.ee.der") |
| |
| for name, private_key in all_key_types.items(): |
| ee_subject = x509.Name( |
| [x509.NameAttribute(NameOID.ORGANIZATION_NAME, name + " test")] |
| ) |
| issuer_subject = x509.Name( |
| [x509.NameAttribute(NameOID.ORGANIZATION_NAME, name + " issuer")] |
| ) |
| certificate: x509.Certificate = end_entity_cert( |
| subject_name=ee_subject, |
| subject_key=private_key, |
| issuer_name=issuer_subject, |
| ) |
| |
| write_der(_cert_path(name), certificate.public_bytes(Encoding.DER), force) |
| |
| def _test( |
| test_name: str, cert_type: str, algorithm: str, signature: bytes, expected: str |
| ) -> None: |
| nonlocal message_path |
| cert_path: str = _cert_path(cert_type) |
| lower_test_name: str = test_name.lower() |
| |
| sig_path: str = os.path.join(output_dir, f"{lower_test_name}.sig.bin") |
| write_der(sig_path, signature, force) |
| |
| print( |
| """ |
| #[test] |
| #[cfg(feature = "alloc")] |
| fn %(lower_test_name)s() { |
| let ee = include_bytes!("%(cert_path)s"); |
| let message = include_bytes!("%(message_path)s"); |
| let signature = include_bytes!("%(sig_path)s"); |
| assert_eq!( |
| check_sig(ee, &webpki::%(algorithm)s, message, signature), |
| %(expected)s |
| ); |
| }""" |
| % locals(), |
| file=output, |
| ) |
| |
| def good_signature( |
| test_name: str, cert_type: str, algorithm: str, signer: SIGNER |
| ) -> None: |
| signature: bytes = signer(all_key_types[cert_type], message) |
| _test(test_name, cert_type, algorithm, signature, expected="Ok(())") |
| |
| def good_signature_but_rejected( |
| test_name: str, cert_type: str, algorithm: str, signer: SIGNER |
| ) -> None: |
| signature: bytes = signer(all_key_types[cert_type], message) |
| _test( |
| test_name, |
| cert_type, |
| algorithm, |
| signature, |
| expected="Err(webpki::Error::InvalidSignatureForPublicKey)", |
| ) |
| |
| def bad_signature( |
| test_name: str, cert_type: str, algorithm: str, signer: SIGNER |
| ) -> None: |
| signature: bytes = signer(all_key_types[cert_type], message + b"?") |
| _test( |
| test_name, |
| cert_type, |
| algorithm, |
| signature, |
| expected="Err(webpki::Error::InvalidSignatureForPublicKey)", |
| ) |
| |
| def bad_algorithms_for_key( |
| test_name: str, cert_type: str, unusable_algs: set[str] |
| ) -> None: |
| cert_path: str = _cert_path(cert_type) |
| test_name_lower: str = test_name.lower() |
| unusable_algs_str: str = ", ".join( |
| "&webpki::" + alg for alg in sorted(unusable_algs) |
| ) |
| print( |
| """ |
| #[test] |
| #[cfg(feature = "alloc")] |
| fn %(test_name_lower)s() { |
| let ee = include_bytes!("%(cert_path)s"); |
| for algorithm in &[ %(unusable_algs_str)s ] { |
| assert_eq!( |
| check_sig(ee, algorithm, b"", b""), |
| Err(webpki::Error::UnsupportedSignatureAlgorithmForPublicKey) |
| ); |
| } |
| }""" |
| % locals(), |
| file=output, |
| ) |
| |
| with trim_top("signatures.rs") as output: |
| # compute all webpki algorithms covered by these tests |
| all_webpki_algs: set[str] = set( |
| [item for algs in webpki_algs.values() for item in algs] |
| ) |
| |
| for type, algs in webpki_algs.items(): |
| for alg in algs: |
| signer: SIGNER = how_to_sign[alg] |
| good_signature( |
| type + "_key_and_" + alg + "_good_signature", |
| cert_type=type, |
| algorithm=alg, |
| signer=signer, |
| ) |
| bad_signature( |
| type + "_key_and_" + alg + "_detects_bad_signature", |
| cert_type=type, |
| algorithm=alg, |
| signer=signer, |
| ) |
| |
| unusable_algs = set(all_webpki_algs) |
| for alg in algs: |
| unusable_algs.remove(alg) |
| |
| # special case: tested separately below |
| if type == "rsa_2048": |
| unusable_algs.remove("RSA_PKCS1_3072_8192_SHA384") |
| |
| bad_algorithms_for_key( |
| type + "_key_rejected_by_other_algorithms", |
| cert_type=type, |
| unusable_algs=unusable_algs, |
| ) |
| |
| good_signature_but_rejected( |
| "rsa_2048_key_rejected_by_RSA_PKCS1_3072_8192_SHA384", |
| cert_type="rsa_2048", |
| algorithm="RSA_PKCS1_3072_8192_SHA384", |
| signer=signer, |
| ) |
| |
| |
| def generate_client_auth_test( |
| output: TextIO, |
| test_name: str, |
| ekus: Optional[Iterable[x509.ObjectIdentifier]], |
| expected_error: Optional[str] = None, |
| force: bool = False, |
| ) -> None: |
| issuer_name: x509.Name = issuer_name_for_test(test_name) |
| |
| # end-entity |
| ee_subject: x509.Name = x509.Name( |
| [x509.NameAttribute(NameOID.ORGANIZATION_NAME, test_name)] |
| ) |
| ee_certificate: x509.Certificate = end_entity_cert( |
| subject_name=ee_subject, |
| ekus=ekus, |
| issuer_name=issuer_name, |
| ) |
| |
| output_dir: str = "client_auth" |
| if not os.path.isdir(output_dir): |
| os.mkdir(output_dir) |
| |
| ee_cert_path: str = os.path.join(output_dir, f"{test_name}.ee.der") |
| write_der(ee_cert_path, ee_certificate.public_bytes(Encoding.DER), force) |
| |
| # issuer |
| ca: x509.Certificate = ca_cert( |
| subject_name=issuer_name, subject_key=ROOT_PRIVATE_KEY |
| ) |
| |
| ca_cert_path: str = os.path.join(output_dir, f"{test_name}.ca.der") |
| write_der(ca_cert_path, ca.public_bytes(Encoding.DER), force) |
| |
| expected: str = "" |
| if expected_error is None: |
| expected = "Ok(())" |
| else: |
| expected = "Err(webpki::Error::" + expected_error + ")" |
| |
| print( |
| """ |
| #[test] |
| #[cfg(feature = "alloc")] |
| fn %(test_name)s() { |
| let ee = include_bytes!("%(ee_cert_path)s"); |
| let ca = include_bytes!("%(ca_cert_path)s"); |
| assert_eq!( |
| check_cert(ee, ca), |
| %(expected)s |
| ); |
| }""" |
| % locals(), |
| file=output, |
| ) |
| |
| |
| def client_auth(force: bool) -> None: |
| with trim_top("client_auth.rs") as output: |
| generate_client_auth_test( |
| output, "cert_with_no_eku_accepted_for_client_auth", ekus=None |
| ) |
| generate_client_auth_test( |
| output, |
| "cert_with_clientauth_eku_accepted_for_client_auth", |
| ekus=[ExtendedKeyUsageOID.CLIENT_AUTH], |
| ) |
| generate_client_auth_test( |
| output, |
| "cert_with_both_ekus_accepted_for_client_auth", |
| ekus=[ExtendedKeyUsageOID.CLIENT_AUTH, ExtendedKeyUsageOID.SERVER_AUTH], |
| ) |
| generate_client_auth_test( |
| output, |
| "cert_with_serverauth_eku_rejected_for_client_auth", |
| ekus=[ExtendedKeyUsageOID.SERVER_AUTH], |
| expected_error="RequiredEkuNotFound", |
| ) |
| |
| |
| def client_auth_revocation(force: bool) -> None: |
| output_dir: str = "client_auth_revocation" |
| if not os.path.isdir(output_dir): |
| os.mkdir(output_dir) |
| |
| # KeyUsage for a CA that sets crl_sign. |
| crl_sign_ku = x509.KeyUsage( |
| digital_signature=True, |
| key_cert_sign=True, |
| crl_sign=True, # NB: crl_sign set. |
| content_commitment=False, |
| key_encipherment=False, |
| data_encipherment=False, |
| key_agreement=False, |
| encipher_only=False, |
| decipher_only=False, |
| ) |
| |
| # KeyUsage for a CA that omits crl_sign. |
| no_crl_sign_ku = x509.KeyUsage( |
| digital_signature=True, |
| key_cert_sign=True, |
| crl_sign=False, # NB: no crl_sign. |
| content_commitment=False, |
| key_encipherment=False, |
| data_encipherment=False, |
| key_agreement=False, |
| encipher_only=False, |
| decipher_only=False, |
| ) |
| |
| def _chain( |
| *, chain_name: str, key_usage: Optional[x509.KeyUsage] |
| ) -> list[tuple[x509.Certificate, str, ANY_PRIV_KEY]]: |
| """ |
| Generate a short test certificate chain: |
| ee -> intermediate -> root. |
| |
| :param chain_name: the name of the certificate chain. Used to generate subject/issuer names and to |
| choose the filename to write DER contents to disk. |
| |
| :param key_usage: the KeyUsage to include in the issuer certificates (both the intermediate and the root). |
| |
| :return: Return a list comprising the chain starting from the end entity and ending at the root trust anchor. |
| Each entry in the list is a tuple of three values: the x509.Certificate object, the path the DER encoding |
| was written to, and lastly the corresponding private key. |
| """ |
| ee_subj: x509.Name = subject_name_for_test("test.example.com", chain_name) |
| int_a_subj: x509.Name = issuer_name_for_test(f"int.a.{chain_name}") |
| int_b_subj: x509.Name = issuer_name_for_test(f"int.b.{chain_name}") |
| ca_subj: x509.Name = issuer_name_for_test(f"ca.{chain_name}") |
| ee_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| int_a_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| int_b_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| root_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| |
| # EE cert issued by intermediate. |
| ee_cert: x509.Certificate = end_entity_cert( |
| subject_name=ee_subj, issuer_name=int_a_subj, issuer_key=int_a_key |
| ) |
| ee_cert_path: str = os.path.join(output_dir, f"{chain_name}.ee.der") |
| write_der(ee_cert_path, ee_cert.public_bytes(Encoding.DER), force) |
| |
| # Second EE cert issued by intermediate, with top-bit set in serial. |
| ee_cert_topbit: x509.Certificate = end_entity_cert( |
| subject_name=ee_subj, |
| issuer_name=int_a_subj, |
| issuer_key=int_a_key, |
| serial=0x80DEADBEEFF00D, |
| ) |
| ee_cert_topbit_path: str = os.path.join( |
| output_dir, f"{chain_name}.topbit.ee.der" |
| ) |
| write_der(ee_cert_topbit_path, ee_cert_topbit.public_bytes(Encoding.DER), force) |
| |
| # intermediate a cert issued by intermediate b. |
| int_a_cert: x509.Certificate = ca_cert( |
| subject_name=int_a_subj, |
| subject_key=int_a_key, |
| issuer_name=int_b_subj, |
| issuer_key=int_b_key, |
| key_usage=key_usage, |
| ) |
| int_a_cert_path: str = os.path.join(output_dir, f"{chain_name}.int.a.ca.der") |
| write_der(int_a_cert_path, int_a_cert.public_bytes(Encoding.DER), force) |
| |
| # intermediate b cert issued by root cert. |
| int_b_cert: x509.Certificate = ca_cert( |
| subject_name=int_b_subj, |
| subject_key=int_b_key, |
| issuer_name=ca_subj, |
| issuer_key=root_key, |
| key_usage=key_usage, |
| ) |
| int_b_cert_path: str = os.path.join(output_dir, f"{chain_name}.int.b.ca.der") |
| write_der(int_b_cert_path, int_b_cert.public_bytes(Encoding.DER), force) |
| |
| # root cert issued by itself. |
| root_cert: x509.Certificate = ca_cert( |
| subject_name=ca_subj, |
| subject_key=root_key, |
| key_usage=key_usage, |
| ) |
| root_cert_path: str = os.path.join(output_dir, f"{chain_name}.root.ca.der") |
| write_der(root_cert_path, root_cert.public_bytes(Encoding.DER), force) |
| |
| return [ |
| (ee_cert, ee_cert_path, ee_key), |
| (int_a_cert, int_a_cert_path, int_a_key), |
| (int_b_cert, int_b_cert_path, int_b_key), |
| (root_cert, root_cert_path, root_key), |
| (ee_cert_topbit, ee_cert_topbit_path, ee_key), |
| ] |
| |
| def _crl( |
| *, |
| serials: Iterable[int], |
| issuer_name: x509.Name, |
| issuer_key: Optional[ANY_PRIV_KEY], |
| ) -> x509.CertificateRevocationList: |
| """ |
| Generate a certificate revocation list. |
| :param serials: list of serial numbers to include in the CRL as revoked certificates. |
| :param issuer_name: the name of the CRL issuer. |
| :param issuer_key: the key used to sign the CRL. |
| :return: a generated x509.CertificateRevocationList. |
| """ |
| issuer_priv_key: ANY_PRIV_KEY = key_or_generate(issuer_key) |
| crl_builder: x509.CertificateRevocationListBuilder = ( |
| x509.CertificateRevocationListBuilder() |
| ) |
| crl_builder = crl_builder.issuer_name(issuer_name) |
| crl_builder = crl_builder.last_update(NOT_BEFORE) |
| crl_builder = crl_builder.next_update(NOT_AFTER) |
| for serial in serials: |
| revoked_cert_builder: x509.RevokedCertificateBuilder = ( |
| x509.RevokedCertificateBuilder() |
| ) |
| revoked_cert_builder = revoked_cert_builder.serial_number(serial) |
| revoked_cert_builder = revoked_cert_builder.revocation_date(NOT_BEFORE) |
| revoked_cert_builder = revoked_cert_builder.add_extension( |
| x509.CRLReason(x509.ReasonFlags.key_compromise), critical=False |
| ) |
| crl_builder = crl_builder.add_revoked_certificate( |
| revoked_cert_builder.build() |
| ) |
| crl_builder = crl_builder.add_extension( |
| x509.CRLNumber(x509.random_serial_number()), critical=False |
| ) |
| |
| return crl_builder.sign( |
| private_key=issuer_priv_key, |
| algorithm=hashes.SHA256(), |
| ) |
| |
| def _revocation_test( |
| *, |
| test_name: str, |
| chain: list[tuple[x509.Certificate, str, ANY_PRIV_KEY]], |
| crl_paths: Iterable[str], |
| owned: bool, |
| expected_error: Optional[str], |
| ee_topbit_serial: bool = False, |
| ) -> None: |
| """ |
| Generate a Rust unit test for a revocation checking scenario and write it to the output file. |
| |
| :param test_name: the name of the unit test. |
| :param chain: the certificate chain to use for validation. |
| :param crl_paths: paths to zero or more CRLs. |
| :param owned: whether to use the owned or borrowed CRL representation. |
| :param expected_error: an optional error to expect to be returned from validation. |
| """ |
| if len(chain) != 5: |
| raise RuntimeError("invalid chain length") |
| |
| ee_cert, ee_cert_path, _ = chain[4] if ee_topbit_serial else chain[0] |
| int_a_cert, int_a_cert_path, _ = chain[1] |
| int_b_cert, int_b_cert_path, _ = chain[2] |
| root_cert, root_cert_path, _ = chain[3] |
| |
| int_a_str = f'include_bytes!("{int_a_cert_path}").as_slice()' |
| int_b_str = f'include_bytes!("{int_b_cert_path}").as_slice()' |
| intermediates_str: str = f"&[{int_a_str}, {int_b_str}]" |
| owned_convert: str = ".to_owned().unwrap()" if owned else "" |
| crl_includes: str = "\n".join( |
| [ |
| f""" |
| &webpki::BorrowedCertRevocationList::from_der(include_bytes!("{path}").as_slice()) |
| .unwrap() |
| {owned_convert} |
| as &dyn webpki::CertRevocationList, |
| """ |
| for path in crl_paths |
| ] |
| ) |
| crls: str = f"&[{crl_includes}]" |
| expected: str = ( |
| f"Err(webpki::Error::{expected_error})" if expected_error else "Ok(())" |
| ) |
| feature_gate = '#[cfg(feature = "alloc")]' if owned else "" |
| |
| print( |
| """ |
| %(feature_gate)s |
| #[test] |
| fn %(test_name)s() { |
| let ee = include_bytes!("%(ee_cert_path)s"); |
| let intermediates = %(intermediates_str)s; |
| let ca = include_bytes!("%(root_cert_path)s"); |
| let crls = %(crls)s; |
| assert_eq!(check_cert(ee, intermediates, ca, crls), %(expected)s); |
| } |
| """ |
| % locals(), |
| file=output, |
| ) |
| |
| # Build a simple certificate chain where the issuers don't have a key usage specified. |
| no_ku_chain = _chain(chain_name="no_ku_chain", key_usage=None) |
| |
| # Build a simple certificate chain where the issuers have key usage specified, but don't include the |
| # cRLSign bit. |
| no_crl_ku_chain = _chain(chain_name="no_crl_ku_chain", key_usage=no_crl_sign_ku) |
| |
| # Build a simple certificate chain where the issuers have key usage specified, and include the cRLSign bit. |
| crl_ku_chain = _chain(chain_name="ku_chain", key_usage=crl_sign_ku) |
| |
| def _no_crls_test_ee_depth() -> None: |
| # Providing no CRLs means the EE cert should verify without err. |
| _revocation_test( |
| test_name="no_crls_test_ee_depth", |
| chain=no_ku_chain, |
| crl_paths=[], |
| owned=False, |
| expected_error=None, |
| ) |
| _revocation_test( |
| test_name="no_crls_test_ee_depth_owned", |
| chain=no_ku_chain, |
| crl_paths=[], |
| owned=True, |
| expected_error=None, |
| ) |
| |
| def _no_relevant_crl_ee_depth() -> None: |
| test_name = "no_relevant_crl_ee_depth" |
| # Generate a CRL that includes the EE cert's serial, but that is issued by an unknown issuer. |
| ee_cert = no_ku_chain[0][0] |
| no_match_crl = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=subject_name_for_test("whatev", test_name), |
| issuer_key=None, |
| ) |
| no_match_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(no_match_crl_path, no_match_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing no relevant CRL means the EE cert should verify without err. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[no_match_crl_path], |
| owned=False, |
| expected_error=None, |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[no_match_crl_path], |
| owned=True, |
| expected_error=None, |
| ) |
| |
| def _ee_not_revoked_ee_depth() -> None: |
| test_name = "ee_not_revoked_ee_depth" |
| ee_cert = no_ku_chain[0][0] |
| int_a_key = no_ku_chain[1][2] |
| # Generate a CRL that doesn't include the EE cert's serial, but that is issued the same issuer. |
| ee_not_revoked_crl = _crl( |
| serials=[12345], # Some serial that isn't the ee_cert.serial. |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| ee_not_revoked_crl_path, |
| ee_not_revoked_crl.public_bytes(Encoding.DER), |
| force, |
| ) |
| |
| # Providing a CRL that's relevant and verifies, but that doesn't include the EE cert serial should verify |
| # without err. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[ee_not_revoked_crl_path], |
| owned=False, |
| expected_error=None, |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[ee_not_revoked_crl_path], |
| owned=True, |
| expected_error=None, |
| ) |
| |
| def _ee_revoked_badsig_ee_depth() -> None: |
| test_name = "ee_revoked_badsig_ee_depth" |
| ee_cert = no_ku_chain[0][0] |
| # Generate a CRL that includes the EE cert's serial, and that is issued the same issuer, but that |
| # has an invalid signature. |
| rand_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| ee_revoked_badsig = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=ee_cert.issuer, |
| issuer_key=rand_key, # NB: Using a random key to sign, not int_a_key. |
| ) |
| ee_revoked_badsig_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| ee_revoked_badsig_path, ee_revoked_badsig.public_bytes(Encoding.DER), force |
| ) |
| |
| # Providing a relevant CRL that includes the EE cert serial but does not verify should error. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_badsig_path], |
| owned=False, |
| expected_error="InvalidCrlSignatureForPublicKey", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_badsig_path], |
| owned=True, |
| expected_error="InvalidCrlSignatureForPublicKey", |
| ) |
| |
| def _ee_revoked_wrong_ku_ee_depth() -> None: |
| test_name = "ee_revoked_wrong_ku_ee_depth" |
| ee_cert = no_crl_ku_chain[0][0] |
| int_a_key = no_crl_ku_chain[1][2] |
| # Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (with a KU specified |
| # but no cRLSign bit) |
| ee_revoked_crl = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing a relevant CRL that includes the EE cert serial but was issued by a CA that has a KU specified |
| # that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=False, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=True, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| |
| def _ee_not_revoked_wrong_ku_ee_depth() -> None: |
| test_name = "ee_not_revoked_wrong_ku_ee_depth" |
| ee_cert = no_crl_ku_chain[0][0] |
| int_a_key = no_crl_ku_chain[1][2] |
| # Generate a CRL that doesn't include the EE cert's serial, but that is issued the same issuer. |
| ee_not_revoked_crl = _crl( |
| serials=[12345], # Some serial that isn't the ee_cert.serial. |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| ee_not_revoked_crl_path, |
| ee_not_revoked_crl.public_bytes(Encoding.DER), |
| force, |
| ) |
| |
| # Providing a relevant CRL that includes the EE cert serial but was issued by a CA that has a KU specified |
| # that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_crl_ku_chain, |
| crl_paths=[ee_not_revoked_crl_path], |
| owned=False, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_crl_ku_chain, |
| crl_paths=[ee_not_revoked_crl_path], |
| owned=True, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| |
| def _ee_revoked_no_ku_ee_depth() -> None: |
| test_name = "ee_revoked_no_ku_ee_depth" |
| ee_cert = no_ku_chain[0][0] |
| int_a_key = no_ku_chain[1][2] |
| # Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (without any KU |
| # specified). |
| ee_revoked_crl = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert |
| # was revoked. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=False, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=True, |
| expected_error="CertRevoked", |
| ) |
| |
| def _ee_revoked_crl_ku_ee_depth() -> None: |
| test_name = "ee_revoked_crl_ku_ee_depth" |
| ee_cert = crl_ku_chain[0][0] |
| int_a_key = crl_ku_chain[1][2] |
| # Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (with a KU |
| # specified that includes cRLSign). |
| ee_revoked_crl = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert |
| # was revoked. |
| _revocation_test( |
| test_name=test_name, |
| chain=crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=False, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=True, |
| expected_error="CertRevoked", |
| ) |
| |
| def _no_crls_test_chain_depth() -> None: |
| # Providing no CRLs means the chain should verify without err. |
| _revocation_test( |
| test_name="no_crls_test_chain_depth", |
| chain=no_ku_chain, |
| crl_paths=[], |
| owned=False, |
| expected_error=None, |
| ) |
| |
| def _no_relevant_crl_chain_depth() -> None: |
| test_name = "no_relevant_crl_chain_depth" |
| # Generate a CRL that includes the first intermediate cert's serial, but that is issued by an unknown issuer. |
| int_a_cert = no_ku_chain[1][0] |
| no_match_crl = _crl( |
| serials=[int_a_cert.serial_number], |
| issuer_name=subject_name_for_test("whatev", test_name), |
| issuer_key=None, |
| ) |
| no_match_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(no_match_crl_path, no_match_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing no relevant CRL means the chain should verify without err. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[no_match_crl_path], |
| owned=False, |
| expected_error=None, |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[no_match_crl_path], |
| owned=True, |
| expected_error=None, |
| ) |
| |
| def _int_not_revoked_chain_depth() -> None: |
| test_name = "int_not_revoked_chain_depth" |
| int_a_cert = no_ku_chain[1][0] |
| int_b_key = no_ku_chain[2][2] |
| # Generate a CRL that doesn't include the intermediate A cert's serial, but that is issued the same issuer. |
| int_not_revoked_crl = _crl( |
| serials=[12345], # Some serial that isn't the int_a_cert.serial. |
| issuer_name=int_a_cert.issuer, |
| issuer_key=int_b_key, |
| ) |
| int_not_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| int_not_revoked_crl_path, |
| int_not_revoked_crl.public_bytes(Encoding.DER), |
| force, |
| ) |
| |
| # Providing a CRL that's relevant and verifies, but that doesn't include the intermediate cert serial should |
| # verify the chain without err. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[int_not_revoked_crl_path], |
| owned=False, |
| expected_error=None, |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[int_not_revoked_crl_path], |
| owned=True, |
| expected_error=None, |
| ) |
| |
| def _int_revoked_badsig_chain_depth() -> None: |
| test_name = "int_revoked_badsig_chain_depth" |
| int_a_cert = no_ku_chain[1][0] |
| # Generate a CRL that includes the intermediate cert's serial, and that is issued the same issuer, but that |
| # has an invalid signature. |
| rand_key: ec.EllipticCurvePrivateKey = ec.generate_private_key( |
| ec.SECP256R1(), default_backend() |
| ) |
| int_revoked_badsig = _crl( |
| serials=[int_a_cert.serial_number], |
| issuer_name=int_a_cert.issuer, |
| issuer_key=rand_key, # NB: Using a random key to sign, not CA cert's key. |
| ) |
| int_revoked_badsig_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| int_revoked_badsig_path, |
| int_revoked_badsig.public_bytes(Encoding.DER), |
| force, |
| ) |
| |
| # Providing a relevant CRL that includes the EE cert serial but does not verify should error. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[int_revoked_badsig_path], |
| owned=False, |
| expected_error="InvalidCrlSignatureForPublicKey", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[int_revoked_badsig_path], |
| owned=True, |
| expected_error="InvalidCrlSignatureForPublicKey", |
| ) |
| |
| def _int_revoked_wrong_ku_chain_depth() -> None: |
| test_name = "int_revoked_wrong_ku_chain_depth" |
| int_a_cert = no_crl_ku_chain[1][0] |
| int_b_key = no_crl_ku_chain[2][2] |
| # Generate a CRL that includes the intermediate A cert's serial, and that is issued by the same issuer (with a |
| # KU specified but no cRLSign bit) |
| int_revoked_crl = _crl( |
| serials=[int_a_cert.serial_number], |
| issuer_name=int_a_cert.issuer, |
| issuer_key=int_b_key, |
| ) |
| int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force |
| ) |
| |
| # Providing a relevant CRL that includes the intermediate cert serial but was issued by a CA that has a KU |
| # specified that doesn't include cRLSign should error indicating the CRL issuer can't sign CRLs. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_crl_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=False, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_crl_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=True, |
| expected_error="IssuerNotCrlSigner", |
| ) |
| |
| def _ee_revoked_chain_depth() -> None: |
| test_name = "ee_revoked_chain_depth" |
| ee_cert = no_ku_chain[0][0] |
| int_a_key = no_ku_chain[1][2] |
| # Generate a CRL that includes the EE cert's serial, and that is issued by the same issuer (without any KU |
| # specified). |
| ee_revoked_crl = _crl( |
| serials=[ee_cert.serial_number], |
| issuer_name=ee_cert.issuer, |
| issuer_key=int_a_key, |
| ) |
| ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force) |
| |
| # Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert |
| # was revoked when using the Chain revocation check depth (since it implies EndEntity). |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=False, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=True, |
| expected_error="CertRevoked", |
| ) |
| |
| def _int_revoked_no_ku_chain_depth() -> None: |
| test_name = "int_revoked_no_ku_chain_depth" |
| int_a_cert = no_ku_chain[1][0] |
| int_b_key = no_ku_chain[2][2] |
| # Generate a CRL that includes the intermediate cert's serial, and that is issued by the same issuer |
| # (without any KU specified). |
| int_revoked_crl = _crl( |
| serials=[int_a_cert.serial_number], |
| issuer_name=int_a_cert.issuer, |
| issuer_key=int_b_key, |
| ) |
| int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force |
| ) |
| |
| # Providing a relevant CRL that includes the intermediate cert serial and verifies, and using the |
| # Chain depth should error since an intermediate cert is revoked. |
| _revocation_test( |
| test_name=test_name, |
| chain=no_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=False, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=no_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=True, |
| expected_error="CertRevoked", |
| ) |
| |
| def _int_revoked_crl_ku_chain_depth() -> None: |
| test_name = "int_revoked_crl_ku_chain_depth" |
| int_a_cert = crl_ku_chain[1][0] |
| int_b_key = crl_ku_chain[2][2] |
| # Generate a CRL that includes the intermediate cert's serial, and that is issued by the same issuer (with a KU |
| # specified that includes cRLSign). |
| int_revoked_crl = _crl( |
| serials=[int_a_cert.serial_number], |
| issuer_name=int_a_cert.issuer, |
| issuer_key=int_b_key, |
| ) |
| int_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der( |
| int_revoked_crl_path, int_revoked_crl.public_bytes(Encoding.DER), force |
| ) |
| |
| # Providing a relevant CRL that includes the EE cert serial and verifies should error indicating the cert |
| # was revoked. |
| _revocation_test( |
| test_name=test_name, |
| chain=crl_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=False, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=crl_ku_chain, |
| crl_paths=[int_revoked_crl_path], |
| owned=True, |
| expected_error="CertRevoked", |
| ) |
| |
| def _ee_with_top_bit_set_serial_revoked() -> None: |
| test_name = "ee_with_top_bit_set_serial_revoked" |
| ee_cert_topbit = crl_ku_chain[4][0] |
| int_a_key = crl_ku_chain[1][2] |
| |
| ee_revoked_crl = _crl( |
| serials=[ee_cert_topbit.serial_number], |
| issuer_name=ee_cert_topbit.issuer, |
| issuer_key=int_a_key, |
| ) |
| |
| ee_revoked_crl_path = os.path.join(output_dir, f"{test_name}.crl.der") |
| write_der(ee_revoked_crl_path, ee_revoked_crl.public_bytes(Encoding.DER), force) |
| |
| _revocation_test( |
| test_name=test_name, |
| chain=crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=False, |
| ee_topbit_serial=True, |
| expected_error="CertRevoked", |
| ) |
| _revocation_test( |
| test_name=test_name + "_owned", |
| chain=crl_ku_chain, |
| crl_paths=[ee_revoked_crl_path], |
| owned=True, |
| ee_topbit_serial=True, |
| expected_error="CertRevoked", |
| ) |
| |
| with trim_top("client_auth_revocation.rs") as output: |
| _no_crls_test_ee_depth() |
| _no_relevant_crl_ee_depth() |
| _ee_not_revoked_ee_depth() |
| _ee_revoked_badsig_ee_depth() |
| _ee_revoked_wrong_ku_ee_depth() |
| _ee_not_revoked_wrong_ku_ee_depth() |
| _ee_revoked_no_ku_ee_depth() |
| _ee_revoked_crl_ku_ee_depth() |
| _no_crls_test_chain_depth() |
| _no_relevant_crl_chain_depth() |
| _int_not_revoked_chain_depth() |
| _int_revoked_badsig_chain_depth() |
| _int_revoked_wrong_ku_chain_depth() |
| _ee_revoked_chain_depth() |
| _int_revoked_no_ku_chain_depth() |
| _int_revoked_crl_ku_chain_depth() |
| _ee_with_top_bit_set_serial_revoked() |
| |
| |
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--tls-server-certs", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Generate TLS server certificate testcases", |
| ) |
| parser.add_argument( |
| "--signatures", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Generate signature testcases", |
| ) |
| parser.add_argument( |
| "--client-auth", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Generate client auth testcases", |
| ) |
| parser.add_argument( |
| "--client-auth-revocation", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Generate client auth revocation testcases", |
| ) |
| parser.add_argument( |
| "--format", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Run cargo fmt post-generation", |
| ) |
| parser.add_argument( |
| "--test", |
| action=argparse.BooleanOptionalAction, |
| default=True, |
| help="Run cargo test post-generation", |
| ) |
| parser.add_argument( |
| "--force", |
| action=argparse.BooleanOptionalAction, |
| default=False, |
| help="Overwrite existing test keys/certs", |
| ) |
| args = parser.parse_args() |
| |
| if args.tls_server_certs: |
| tls_server_certs(args.force) |
| if args.signatures: |
| signatures(args.force) |
| if args.client_auth: |
| client_auth(args.force) |
| if args.client_auth_revocation: |
| client_auth_revocation(args.force) |
| |
| if args.format: |
| subprocess.run("cargo fmt", shell=True, check=True) |
| if args.test: |
| subprocess.run("cargo test", shell=True, check=True) |