#!/usr/bin/env python3 import os import sys import subprocess import datetime from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography import x509 from acme import client, messages, crypto_util, challenges from acme.errors import ConflictError, ValidationError import josepy import click @click.group() def cli(): pass @cli.command(name="extract_domains") @click.argument('csr_file', type=click.File('rb')) def extract_domains(csr_file): csr = x509.load_pem_x509_csr(csr_file.read(), default_backend()) san_ext = csr.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) sans = san_ext.value.get_values_for_type(x509.DNSName) print(' '.join(sans)) @cli.command(name="remaining_days") @click.argument('crt_file', type=click.File('rb')) def remaining_days(crt_file): crt = x509.load_pem_x509_certificate(crt_file.read(), default_backend()) print((crt.not_valid_after - datetime.datetime.now()).days) @cli.command(name="get_cert") @click.option('--directory', default="https://acme-staging-v02.api.letsencrypt.org/directory") @click.option('--acc', required=True, type=click.File('rb')) @click.option('--csr', required=True, type=click.File('rb')) @click.argument('challenge_hook', nargs=-1) def get_cert(directory, acc, csr, challenge_hook): acc_key = serialization.load_pem_private_key(acc.read(), None, default_backend()) jose = josepy.JWKRSA(key=acc_key) net = client.ClientNetwork(jose, user_agent="acme-primitives") directory = messages.Directory.from_json(net.get(directory).json()) client_acme = client.ClientV2(directory, net=net) try: regr = client_acme.new_account(messages.NewRegistration.from_data(terms_of_service_agreed=True)) except ConflictError as e: regr = client_acme.update_registration(messages.RegistrationResource(body=messages.Registration.from_data(terms_of_service_agreed=True))) csr_data = csr.read() orderr = client_acme.new_order(csr_data) for authz in orderr.authorizations: domain = authz.body.identifier.value for i in authz.body.challenges: if isinstance(i.chall, challenges.DNS01): response, validation = i.response_and_validation(jose) subprocess.run([*challenge_hook, i.validation_domain_name(domain), validation], env=os.environ.copy(), check=True) client_acme.answer_challenge(i, response) try: finalized_orderr = client_acme.poll_and_finalize(orderr) except ValidationError as e: for auth in e.failed_authzrs: msg += '\n Authorization for identifier %s failed.' % ( auth.body.identifier) msg += '\n Here are the challenges that were not fulfilled:' for challenge in auth.body.challenges: msg += \ '\n Challenge Type: %s' \ '\n Error information: ' \ '\n Type: %s' \ '\n Details: %s \n\n' % ( challenge.chall.typ, challenge.error.typ if challenge.error else '', challenge.error.detail if challenge.error else '', ) print(msg, file=sys.stderr, flush=True) sys.exit(1) print(finalized_orderr.fullchain_pem) if __name__ == '__main__': cli()