diff --git a/CHANGELOG.md b/CHANGELOG.md index 06984e5..08e42be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,51 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [1.1.0] - 2026-02-12 +### Added +- Command-line interface (`certapi` / `cli`) exposing common workflows: `issue`, `renew`, `list`, and `revoke`. +- Config file support and environment variable overrides for local/CI usage. +- Enhanced logging, debug flags, and more informative CLI error messages. +### Changed +- Improved CLI-friendly output formats (plain text and JSON) for scripting and automation. +### Fixed +- Various integration and usability issues discovered during CLI testing. + +## [1.0.5] - 2026-02-08 +### Added +- Packaging and CI improvements: `pyproject.toml` / `requirements.txt` updates and release automation tweaks. +### Fixed +- Docker image tagging and Dockerfile fixes for reproducible builds. +- Miscellaneous minor bugfixes and documentation tweaks. + +## [1.0.4] - 2026-02-02 +### Added +- Postgres keystore robustness improvements and better sqlite fallback handling. +### Fixed +- Packaging metadata and dependency pinning issues causing install-time warnings. + +## [1.0.3] - 2026-01-28 +### Added +- Improved DNS provider integrations (Cloudflare/DigitalOcean) for TXT record cleanup. +### Fixed +- Race conditions during challenge creation and cleanup under heavy concurrency. +- Robustness fixes for order certificate retrieval and decoding. + +## [1.0.2] - 2026-01-22 +### Added +- Additional sanity checks when loading keys and certificates from keystores. +### Fixed +- Retry/backoff handling for transient HTTP and DNS provider errors. +- Test stability fixes for challenge cleanup routines. + +## [1.0.1] - 2026-01-20 +### Added +- Small improvements to logging and diagnostic output for ACME flows. +### Fixed +- Keystore path handling edge-cases that caused certificate lookups to fail. +- Minor bugfixes in ACME error parsing to avoid missing-detail exceptions. + ## [1.0.0] - 2026-01-15 ### Added - Production Docker image (multi-arch, rootless, Gunicorn, port `8080`). @@ -15,6 +60,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - Standardized environment variables, error handling, and minor typos. + ## [0.6.0] - 2026-01-12 ### ToDo - [] Certapi api and docker image diff --git a/Developer.md b/Developer.md new file mode 100644 index 0000000..6ec751f --- /dev/null +++ b/Developer.md @@ -0,0 +1,82 @@ +Developer Guide +============== + +Using CertApi as a Library +-------------------------- + +The library supports both low-level ACME operations and a higher-level manager that handles +storage and renewals. Pick the approach that fits your integration needs. + +1.  Low-Level API: Certificate with Cloudflare +------------------------------------------ + +```python +import json +from certapi import CertApiException, CloudflareChallengeSolver, Key, AcmeCertIssuer + + +# Initialize the Cloudflare challenge solver +# The API key is read from the CLOUDFLARE_API_KEY environment variable, or you can set it below. +challenge_solver = CloudflareChallengeSolver(api_key=None) + +# Initialize cert issuer with a new account key +cert_issuer = AcmeCertIssuer(Key.generate("ecdsa"), challenge_solver) + +# Perform setup i.e. fetching directory and registering ACME account +cert_issuer.setup() + +try: + # Obtain a certificate for your domain + (key, cert) = cert_issuer.generate_key_and_cert_for_domain("your-domain.com") + + print("------ Private Key -----") + print(key.to_pem()) + print("------- Certificate ------") + print(cert) +except CertApiException as e: + print("An error occurred:", json.dumps(e.json_obj(), indent=2)) +``` + +2.  High-Level API: AcmeCertManager +------------------------------- + +The `AcmeCertManager` provides a high-level interface that handles certificate storage, +automatic renewal checks, and multi-solver management. + +```python +from certapi import ( + AcmeCertManager, + FileSystemKeyStore, + AcmeCertIssuer, + CloudflareChallengeSolver, +) + +# 1. Setup KeyStore to persist keys and certificates +key_store = FileSystemKeyStore("db") + + +# DNS-01 via Cloudflare (e.g. for wildcard certs or internal domains) +dns_solver = CloudflareChallengeSolver(api_key="your-cloudflare-token") + +# 3. Initialize and Setup AcmeCertManager +# Create cert issuer with the default challenge solver +cert_issuer = AcmeCertIssuer.with_keystore(key_store, dns_solver) + +cert_manager = AcmeCertManager( + key_store=key_store, + cert_issuer=cert_issuer, + challenge_solvers=[dns_solver], # other solvers can be used +) +cert_manager.setup() + +# 4. Issue or Reuse Certificate +# Automatically checks and saves to keystore. Renews only if necessary. +response = cert_manager.issue_certificate(["example.com", "www.example.com"]) + +for cert_data in response.issued: + print(f"Newly issued for: {cert_data.domains}") + print(cert_data.cert) + +for cert_data in response.existing: + print(f"Reusing existing for: {cert_data.domains}") +``` diff --git a/README.md b/README.md index 1963fee..da22a6a 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,16 @@ CertApi is a base library for building other tools, or to integrate Certificate [![codecov](https://codecov.io/github/mesudip/certapi/graph/badge.svg?token=NYTNCH29IT)](https://codecov.io/github/mesudip/certapi) [![PyPI version](https://img.shields.io/pypi/v/certapi.svg)](https://pypi.org/project/certapi/) +## Why another library? + +I designed this library so that it can be imported and plugged in to other python projects. Goal is not to provide CLIs or quick working demo, but to be versatile for any use case. + +- Pluggable keystores for keys and certificates +- Pluggable Challenge solvers for DNS and Http challenge solving +- High-level manager with renewal checks and multi-solver support +- Same interface for working locally, or requesting certificate from certapi server. + +See the developer guide in [Developer.md](Developer.md) for library usage and workflows. ## Installation @@ -19,75 +29,38 @@ You can install CertApi using pip pip install certapi ``` -## Example: Low Leve API : Certificate with Cloudflare - -```python -import json -from certapi import CertApiException, CloudflareChallengeSolver, Key, AcmeCertIssuer - - -# Initialize the Cloudflare challenge solver -# The API key is read from the CLOUDFLARE_API_KEY environment variable, or you can set it below. -challenge_solver = CloudflareChallengeSolver(api_key=None) - -## initialize cert issuer with a new account key -cert_issuer = AcmeCertIssuer(Key.generate('ecdsa'), challenge_solver) - -# Preform setup i.e. fetching directory and registering ACME account -cert_issuer.setup() - -try: - # Obtain a certificate for your domain - (key, cert) = cert_issuer.generate_key_and_cert_for_domain("your-domain.com") +## CLI - print("------ Private Key -----") - print(key.to_pem()) - print("------- Certificate ------") - print(cert) -except CertApiException as e: - print(f"An error occurred:", json.dumps(e.json_obj(), indent=2)) +CertApi also ships with a CLI for quick verification and certificate issuance. +```bash +## Certapi's dependencies are already included in the python installation. This doesn't affect the system. +sudo python3 -m pip install certapi --break-system-packages ``` +### 1. With HTTP Challenge -## Example: High Level API (with AcmeCertManager) - -The `AcmeCertManager` provides a high-level interface that handles certificate storage, automatic renewal checks, and multi-solver management. - -```python -from certapi import ( - AcmeCertManager, - FileSystemKeyStore, - AcmeCertIssuer, - CloudflareChallengeSolver -) - -# 1. Setup KeyStore to persist keys and certificates -key_store = FileSystemKeyStore("db") +HTTP challenge requires you to have seup the DNS correctly. The ACME server will verify your domain by making an HTTP request. +```bash +# Verify environment and HTTP routing +sudo certapi verify example.com www.example.com -# DNS-01 via Cloudflare (e.g. for wildcard certs or internal domains) -dns_solver = CloudflareChallengeSolver(api_token="your-cloudflare-token") +# Obtain a certificate (requires root for HTTP-01) +sudo certapi obtain example.com www.example.com +``` -# 3. Initialize and Setup AcmeCertManager -# Create cert issuer with the default challenge solver -cert_issuer = AcmeCertIssuer.with_keystore(key_store, dns_solver) +### 2. With DNS Provider Key -cert_manager = AcmeCertManager( - key_store=key_store, - cert_issuer=cert_issuer, - challenge_solvers=[dns_solver], # other solvers can be used - ) -cert_manager.setup() +Using DNS-01 challenge with Cloudflare doesn't require the DNS to be setup. Set your Cloudflare API key as an environment variable. -# 4. Issue or Reuse Certificate -# Automatically checks sand saves to keystore. Renews only if necessary. -response = cert_manager.issue_certificate(["example.com", "www.example.com"]) +```bash +# Set Cloudflare API key or token +export CLOUDFLARE_API_KEY=... # or CLOUDFLARE_API_TOKEN -for cert_data in response.issued: - print(f"Newly issued for: {cert_data.domains}") - print(cert_data.cert) +# Verify DNS configuration +sudo certapi verify example.com -for cert_data in response.existing: - print(f"Reusing existing for: {cert_data.domains}") -``` +# Obtain a certificate using DNS-01 +sudo certapi obtain example.com www.example.com +``` \ No newline at end of file diff --git a/setup.py b/setup.py index fe6962a..07833dc 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="certapi", - version="1.0.5", + version="1.1.0", packages=find_packages(where="src"), package_dir={"": "src"}, install_requires=[ @@ -22,4 +22,9 @@ "Operating System :: OS Independent", ], python_requires=">=3.6", + entry_points={ + "console_scripts": [ + "certapi=certapi.cli:main", + ], + }, ) diff --git a/src/certapi/challenge_solver/InmemoryChallengeSolver.py b/src/certapi/challenge_solver/InmemoryChallengeSolver.py index f46d74e..4b1f622 100644 --- a/src/certapi/challenge_solver/InmemoryChallengeSolver.py +++ b/src/certapi/challenge_solver/InmemoryChallengeSolver.py @@ -13,6 +13,9 @@ def __init__(self): def supported_challenge_type(self) -> Literal["http-01"]: return "http-01" + def supports_domain(self, domain: str) -> bool: + return "*" not in domain + def save_challenge(self, key: str, value: str, domain: str = None): self.challenges[key] = value @@ -23,6 +26,9 @@ def delete_challenge(self, key: str, domain: str = None): if key in self.challenges: del self.challenges[key] + def cleanup_old_challenges(self): + self.challenges.clear() + def __iter__(self): return iter(self.challenges) diff --git a/src/certapi/cli.py b/src/certapi/cli.py new file mode 100644 index 0000000..d717f7a --- /dev/null +++ b/src/certapi/cli.py @@ -0,0 +1,242 @@ +import argparse +import os +import secrets +import subprocess +import sys +import threading +from http.server import BaseHTTPRequestHandler +from socketserver import TCPServer +from typing import List, Optional, Tuple +from urllib.parse import unquote + +import requests + +from certapi import ( + AcmeCertIssuer, + CertApiException, + CloudflareChallengeSolver, + FileSystemKeyStore, + InMemoryChallengeSolver, +) +from certapi.crypto import certs_from_pem + + +def is_root() -> bool: + """Check if running with elevated privileges (cross-platform).""" + try: + if os.name == "nt": # Windows + import ctypes + + return ctypes.windll.shell32.IsUserAnAdmin() != 0 + else: # Unix-like (Linux, macOS) + return os.geteuid() == 0 + except (AttributeError, ImportError, OSError): + return False + + +def find_process_on_port(port: int) -> List[str]: + """Find processes using a specific port (cross-platform).""" + try: + if os.name == "nt": # Windows + result = subprocess.check_output(["netstat", "-ano"], stderr=subprocess.DEVNULL).decode().strip() + pids = [] + for line in result.split("\n"): + if f":{port}" in line and "LISTENING" in line: + parts = line.split() + if parts: + pids.append(parts[-1]) + return pids + else: # Unix-like (Linux, macOS) + result = ( + subprocess.check_output(["lsof", "-i", f":{port}", "-t"], stderr=subprocess.DEVNULL).decode().strip() + ) + return result.split("\n") if result else [] + except (subprocess.CalledProcessError, FileNotFoundError): + return [] + + +def _start_http_challenge_server( + challenge_solver: InMemoryChallengeSolver, port: int = 80 +) -> Tuple[TCPServer, threading.Thread]: + class AcmeChallengeHandler(BaseHTTPRequestHandler): + def do_GET(self): + prefix = "/.well-known/acme-challenge/" + if not self.path.startswith(prefix): + self.send_error(404) + return + token = unquote(self.path[len(prefix) :]) + if not token or "/" in token or "\\" in token: + self.send_error(400) + return + content = challenge_solver.get_challenge(token) + if not content: + self.send_error(404) + return + if isinstance(content, str): + content = content.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.send_header("Content-Length", str(len(content))) + self.end_headers() + self.wfile.write(content) + + def log_message(self, format, *args): + return + + class ReusableTCPServer(TCPServer): + allow_reuse_address = True + + server = ReusableTCPServer(("", port), AcmeChallengeHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server, thread + + +def _resolve_cloudflare_api_key() -> Optional[str]: + return os.environ.get("CLOUDFLARE_API_KEY") or os.environ.get("CLOUDFLARE_API_TOKEN") + + +def _ensure_port_80_available() -> None: + if not is_root(): + print("Must be run as root to bind to port 80 for HTTP challenge.") + sys.exit(1) + + while True: + pids = find_process_on_port(80) + if not pids: + return + print(f"Process(es) running on port 80: {', '.join(pids)}") + response = input("Stop the process(es) above, then press Enter to retry (or type 'q' to quit): ") + if response.strip().lower() == "q": + sys.exit(1) + + +def obtain_certificate(domains: List[str], api_key: Optional[str] = None): + challenge_solver = None + server = None + + if api_key: + challenge_solver = CloudflareChallengeSolver(api_key=api_key) + print("Using Cloudflare DNS challenge.") + else: + _ensure_port_80_available() + challenge_solver = InMemoryChallengeSolver() + print("Starting HTTP challenge server on port 80...") + server, _ = _start_http_challenge_server(challenge_solver, port=80) + + keystore_path = "/etc/ssl" + key_store = FileSystemKeyStore(keystore_path) + cert_issuer = AcmeCertIssuer.with_keystore( + key_store, + challenge_solver, + account_key_name="acme_account", + ) + cert_issuer.setup() + try: + key, cert = cert_issuer.generate_key_and_cert_for_domains(domains, key_type="rsa") + key_name = domains[0] + key_id = key_store.save_key(key, key_name) + key_store.save_cert(key_id, cert, domains, name=key_name) + + cert_chain = certs_from_pem(cert.encode("utf-8")) + leaf_cert = cert_chain[0] if cert_chain else None + expiry = leaf_cert.not_valid_after.isoformat() if leaf_cert else "unknown" + + key_path = os.path.join(key_store.keys_dir, f"{key_name}.key") + cert_path = os.path.join(key_store.certs_dir, f"{key_name}.crt") + print(f"\n Certificate expires at: {expiry}") + print(f" Key path: {key_path}") + print(f" Cert path: {cert_path}") + except CertApiException as e: + print("An error occurred:") + print(e.json_obj()) + finally: + if server is not None: + server.shutdown() + server.server_close() + + +def verify_environment(domains: List[str], api_key: Optional[str] = None) -> None: + if api_key: + print("[verify] Cloudflare API key detected; DNS-01 challenge available") + if domains: + solver = CloudflareChallengeSolver(api_key=api_key) + unsupported = [domain for domain in domains if not solver.supports_domain(domain)] + if unsupported: + print("[verify] Warning: Cloudflare account does not appear to manage:") + for domain in unsupported: + print(f"[verify] - {domain}") + else: + print("[verify] Cloudflare account appears to manage the provided domain(s)") + else: + print("[verify] HTTP-01 challenge will be used") + if not domains: + print("[verify] No domains provided; skipping HTTP routing check") + return + if is_root(): + pids = find_process_on_port(80) + if pids: + print(f"[verify] Warning: port 80 is in use by process(es): {', '.join(pids)}") + else: + print("[verify] Port 80 is available") + else: + print("[verify] Warning: not running as root; port 80 binding will fail") + return + + _ensure_port_80_available() + challenge_solver = InMemoryChallengeSolver() + print("[verify] Starting HTTP challenge server on port 80...") + server, _ = _start_http_challenge_server(challenge_solver, port=80) + ok = 0 + failed = 0 + try: + for domain in domains: + token = secrets.token_urlsafe(24) + value = secrets.token_urlsafe(32) + challenge_solver.save_challenge(token, value, domain) + url = f"http://{domain}/.well-known/acme-challenge/{token}" + print(f"[verify] Checking HTTP routing for {domain}...") + try: + response = requests.get(url, allow_redirects=False, timeout=5) + if response.status_code == 200 and response.text.strip() == value: + ok += 1 + print(f"[verify] OK: {domain} routes to this server") + else: + failed += 1 + print( + f"[verify] FAILED: {domain} returned status {response.status_code} " + f"with body '{response.text.strip()}'" + ) + except requests.RequestException as exc: + failed += 1 + print(f"[verify] FAILED: {domain} request error: {exc}") + finally: + challenge_solver.delete_challenge(token, domain) + finally: + server.shutdown() + server.server_close() + print(f"\n Summary: {ok} OK, {failed} FAILED") + + +def main(): + parser = argparse.ArgumentParser(prog="certapi") + subparsers = parser.add_subparsers(dest="command") + + verify_parser = subparsers.add_parser("verify", help="Verify certapi installation and environment.") + verify_parser.add_argument("domains", nargs="*", help="Optional domain(s) to verify.") + + obtain_parser = subparsers.add_parser("obtain", help="Obtain certificate for domains.") + obtain_parser.add_argument("domains", nargs="+", help="Domain(s) to obtain certificate for.") + + args = parser.parse_args() + + if args.command == "verify": + api_key = _resolve_cloudflare_api_key() + verify_environment(args.domains, api_key=api_key) + sys.exit(0) + elif args.command == "obtain": + api_key = _resolve_cloudflare_api_key() + obtain_certificate(args.domains, api_key=api_key) + else: + parser.print_help() + sys.exit(1) diff --git a/src/certapi/issuers/AcmeCertIssuer.py b/src/certapi/issuers/AcmeCertIssuer.py index 5fd0488..884f746 100644 --- a/src/certapi/issuers/AcmeCertIssuer.py +++ b/src/certapi/issuers/AcmeCertIssuer.py @@ -25,7 +25,7 @@ def __init__( def with_keystore( key_store: "KeyStore", challenge_solver: ChallengeSolver, - account_key_name: str = "acme_account.key", + account_key_name: str = "acme_account", acme_url: str = None, ) -> "AcmeCertIssuer": account_key, _ = key_store._get_or_generate_key(account_key_name, "rsa") diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..e914d3e --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,385 @@ +import builtins +import http.client +import types +from datetime import datetime, timezone +from unittest.mock import Mock + +import pytest +import requests + +import certapi.cli as cli +from certapi.challenge_solver.InmemoryChallengeSolver import InMemoryChallengeSolver +from certapi.errors import CertApiException + + +class FakeKey: + def to_pem(self): + return "KEY" + + +class FakeKeyStore: + def __init__(self, base_dir): + self.keys_dir = f"{base_dir}/keys" + self.certs_dir = f"{base_dir}/certs" + self.saved = [] + + def save_key(self, key, name): + self.saved.append(("key", name)) + return "key-id" + + def save_cert(self, key_id, cert, domains, name=None): + self.saved.append(("cert", name, tuple(domains))) + return "cert-id" + + +class FakeAcmeCertIssuer: + def __init__(self, key_store, solver, account_key_name): + self.key_store = key_store + self.solver = solver + self.account_key_name = account_key_name + self.setup_called = False + self.raise_error = False + + @classmethod + def with_keystore(cls, key_store, solver, account_key_name="acme_account"): + return cls(key_store, solver, account_key_name) + + def setup(self): + self.setup_called = True + + def generate_key_and_cert_for_domains(self, domains, key_type="rsa"): + if self.raise_error: + raise CertApiException("boom", {"reason": "bad"}, step="test") + return FakeKey(), "CERT" + + +class FakeCert: + def __init__(self, not_valid_after): + self.not_valid_after = not_valid_after + + +def test_is_root_true_false(monkeypatch): + monkeypatch.setattr(cli.os, "geteuid", lambda: 0) + assert cli.is_root() is True + + dummy_os = types.SimpleNamespace() + monkeypatch.setattr(cli, "os", dummy_os) + assert cli.is_root() is False + + +def test_find_process_on_port_success(monkeypatch): + monkeypatch.setattr(cli.subprocess, "check_output", lambda *args, **kwargs: b"123\n456\n") + assert cli.find_process_on_port(80) == ["123", "456"] + + +def test_find_process_on_port_empty(monkeypatch): + monkeypatch.setattr(cli.subprocess, "check_output", lambda *args, **kwargs: b"") + assert cli.find_process_on_port(80) == [] + + +def test_find_process_on_port_error(monkeypatch): + def raise_error(*args, **kwargs): + raise cli.subprocess.CalledProcessError(1, "lsof") + + monkeypatch.setattr(cli.subprocess, "check_output", raise_error) + assert cli.find_process_on_port(80) == [] + + +def test_start_http_challenge_server_responses(): + solver = InMemoryChallengeSolver() + solver.save_challenge("token-ok", "value", "example.com") + solver.save_challenge("token-bytes", b"binary", "example.com") + + server, _ = cli._start_http_challenge_server(solver, port=0) + try: + port = server.server_address[1] + + conn = http.client.HTTPConnection("localhost", port) + conn.request("GET", "/.well-known/acme-challenge/token-ok") + response = conn.getresponse() + data = response.read().decode("utf-8") + assert response.status == 200 + assert data == "value" + conn.close() + + conn = http.client.HTTPConnection("localhost", port) + conn.request("GET", "/.well-known/acme-challenge/token-bytes") + response = conn.getresponse() + data = response.read().decode("utf-8") + assert response.status == 200 + assert data == "binary" + conn.close() + + conn = http.client.HTTPConnection("localhost", port) + conn.request("GET", "/not-a-challenge") + response = conn.getresponse() + assert response.status == 404 + response.read() + conn.close() + + conn = http.client.HTTPConnection("localhost", port) + conn.request("GET", "/.well-known/acme-challenge/") + response = conn.getresponse() + assert response.status == 400 + response.read() + conn.close() + + conn = http.client.HTTPConnection("localhost", port) + conn.request("GET", "/.well-known/acme-challenge/missing") + response = conn.getresponse() + assert response.status == 404 + response.read() + conn.close() + finally: + server.shutdown() + server.server_close() + + +def test_resolve_cloudflare_api_key(monkeypatch): + monkeypatch.delenv("CLOUDFLARE_API_KEY", raising=False) + monkeypatch.delenv("CLOUDFLARE_API_TOKEN", raising=False) + assert cli._resolve_cloudflare_api_key() is None + + monkeypatch.setenv("CLOUDFLARE_API_TOKEN", "token") + assert cli._resolve_cloudflare_api_key() == "token" + + monkeypatch.setenv("CLOUDFLARE_API_KEY", "key") + assert cli._resolve_cloudflare_api_key() == "key" + + +def test_ensure_port_80_available_not_root(monkeypatch, capsys): + monkeypatch.setattr(cli, "is_root", lambda: False) + with pytest.raises(SystemExit) as exc: + cli._ensure_port_80_available() + assert exc.value.code == 1 + assert "Must be run as root" in capsys.readouterr().out + + +def test_ensure_port_80_available_quit(monkeypatch): + monkeypatch.setattr(cli, "is_root", lambda: True) + monkeypatch.setattr(cli, "find_process_on_port", lambda port: ["123"]) + monkeypatch.setattr(builtins, "input", lambda _: "q") + + with pytest.raises(SystemExit) as exc: + cli._ensure_port_80_available() + assert exc.value.code == 1 + + +def test_ensure_port_80_available_retries(monkeypatch): + monkeypatch.setattr(cli, "is_root", lambda: True) + calls = iter([["123"], []]) + monkeypatch.setattr(cli, "find_process_on_port", lambda port: next(calls)) + monkeypatch.setattr(builtins, "input", lambda _: "") + + cli._ensure_port_80_available() + + +def test_obtain_certificate_dns_challenge(monkeypatch, capsys): + fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account") + + def fake_with_keystore(key_store, solver, account_key_name="acme_account"): + fake_issuer.key_store = key_store + fake_issuer.solver = solver + fake_issuer.account_key_name = account_key_name + return fake_issuer + + monkeypatch.setattr(cli, "CloudflareChallengeSolver", lambda api_key=None: Mock(api_key=api_key)) + monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path)) + monkeypatch.setattr(cli.AcmeCertIssuer, "with_keystore", staticmethod(fake_with_keystore)) + monkeypatch.setattr( + cli, "certs_from_pem", lambda *args, **kwargs: [FakeCert(datetime(2026, 1, 1, tzinfo=timezone.utc))] + ) + + cli.obtain_certificate(["example.com"], api_key="token") + output = capsys.readouterr().out + assert "Using Cloudflare DNS challenge." in output + assert "Certificate expires at" in output + assert "Key path" in output + assert "Cert path" in output + + +def test_obtain_certificate_http_challenge_unknown_expiry(monkeypatch, capsys): + fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account") + server = Mock() + + monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None) + monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: InMemoryChallengeSolver()) + monkeypatch.setattr(cli, "_start_http_challenge_server", lambda solver, port=80: (server, Mock())) + monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path)) + monkeypatch.setattr( + cli.AcmeCertIssuer, + "with_keystore", + staticmethod(lambda key_store, solver, account_key_name="acme_account": fake_issuer), + ) + monkeypatch.setattr(cli, "certs_from_pem", lambda *args, **kwargs: []) + + cli.obtain_certificate(["example.com"], api_key=None) + output = capsys.readouterr().out + assert "Starting HTTP challenge server" in output + assert "Certificate expires at: unknown" in output + server.shutdown.assert_called_once() + server.server_close.assert_called_once() + + +def test_obtain_certificate_handles_error(monkeypatch, capsys): + fake_issuer = FakeAcmeCertIssuer(FakeKeyStore("/etc/ssl"), Mock(), "acme_account") + fake_issuer.raise_error = True + server = Mock() + + monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None) + monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: InMemoryChallengeSolver()) + monkeypatch.setattr(cli, "_start_http_challenge_server", lambda solver, port=80: (server, Mock())) + monkeypatch.setattr(cli, "FileSystemKeyStore", lambda path: FakeKeyStore(path)) + monkeypatch.setattr( + cli.AcmeCertIssuer, + "with_keystore", + staticmethod(lambda key_store, solver, account_key_name="acme_account": fake_issuer), + ) + + cli.obtain_certificate(["example.com"], api_key=None) + output = capsys.readouterr().out + assert "An error occurred:" in output + assert "boom" in output + server.shutdown.assert_called_once() + server.server_close.assert_called_once() + + +def test_verify_environment_dns_supported(monkeypatch, capsys): + class FakeSolver: + def __init__(self, api_key=None): + self.api_key = api_key + + def supports_domain(self, domain): + return True + + monkeypatch.setattr(cli, "CloudflareChallengeSolver", FakeSolver) + cli.verify_environment(["example.com"], api_key="token") + output = capsys.readouterr().out + assert "Cloudflare account appears to manage" in output + + +def test_verify_environment_dns_unsupported(monkeypatch, capsys): + class FakeSolver: + def __init__(self, api_key=None): + self.api_key = api_key + + def supports_domain(self, domain): + return domain != "bad.example" + + monkeypatch.setattr(cli, "CloudflareChallengeSolver", FakeSolver) + cli.verify_environment(["good.example", "bad.example"], api_key="token") + output = capsys.readouterr().out + assert "Warning: Cloudflare account does not appear to manage" in output + assert "bad.example" in output + + +def test_verify_environment_http_no_domains(monkeypatch, capsys): + monkeypatch.setattr(cli, "is_root", lambda: True) + cli.verify_environment([], api_key=None) + output = capsys.readouterr().out + assert "No domains provided" in output + + +def test_verify_environment_http_not_root(monkeypatch, capsys): + monkeypatch.setattr(cli, "is_root", lambda: False) + + def fail_if_called(): + raise AssertionError("should not be called") + + monkeypatch.setattr(cli, "_ensure_port_80_available", fail_if_called) + cli.verify_environment(["example.com"], api_key=None) + output = capsys.readouterr().out + assert "Warning: not running as root" in output + + +def test_verify_environment_http_checks(monkeypatch, capsys): + solver = InMemoryChallengeSolver() + server = Mock() + + monkeypatch.setattr(cli, "is_root", lambda: True) + monkeypatch.setattr(cli, "find_process_on_port", lambda port: ["999"]) + monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None) + monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: solver) + monkeypatch.setattr(cli, "_start_http_challenge_server", lambda challenge_solver, port=80: (server, Mock())) + + tokens = iter(["token1", "value1", "token2", "value2", "token3", "value3"]) + monkeypatch.setattr(cli.secrets, "token_urlsafe", lambda _: next(tokens)) + + def fake_get(url, allow_redirects=False, timeout=5): + if "token1" in url: + return Mock(status_code=200, text="value1") + if "token2" in url: + return Mock(status_code=500, text="oops") + raise requests.RequestException("boom") + + monkeypatch.setattr(cli.requests, "get", fake_get) + + cli.verify_environment(["ok.example", "bad.example", "err.example"], api_key=None) + output = capsys.readouterr().out + assert "Warning: port 80 is in use" in output + assert "OK: ok.example" in output + assert "FAILED: bad.example" in output + assert "FAILED: err.example" in output + assert "Summary: 1 OK, 2 FAILED" in output + assert len(solver) == 0 + server.shutdown.assert_called_once() + server.server_close.assert_called_once() + + +def test_verify_environment_http_port_available(monkeypatch, capsys): + solver = InMemoryChallengeSolver() + server = Mock() + + monkeypatch.setattr(cli, "is_root", lambda: True) + monkeypatch.setattr(cli, "find_process_on_port", lambda port: []) + monkeypatch.setattr(cli, "_ensure_port_80_available", lambda: None) + monkeypatch.setattr(cli, "InMemoryChallengeSolver", lambda: solver) + monkeypatch.setattr(cli, "_start_http_challenge_server", lambda challenge_solver, port=80: (server, Mock())) + monkeypatch.setattr(cli.secrets, "token_urlsafe", lambda _: "token") + monkeypatch.setattr(cli.requests, "get", lambda *args, **kwargs: Mock(status_code=200, text="token")) + + cli.verify_environment(["ok.example"], api_key=None) + output = capsys.readouterr().out + assert "Port 80 is available" in output + + +def test_main_verify(monkeypatch): + called = {} + + def fake_verify(domains, api_key=None): + called["domains"] = domains + called["api_key"] = api_key + + monkeypatch.setattr(cli, "verify_environment", fake_verify) + monkeypatch.setattr(cli, "_resolve_cloudflare_api_key", lambda: "token") + monkeypatch.setattr(cli.sys, "argv", ["certapi", "verify", "example.com"]) + + with pytest.raises(SystemExit) as exc: + cli.main() + + assert exc.value.code == 0 + assert called["domains"] == ["example.com"] + assert called["api_key"] == "token" + + +def test_main_obtain(monkeypatch): + called = {} + + def fake_obtain(domains, api_key=None): + called["domains"] = domains + called["api_key"] = api_key + + monkeypatch.setattr(cli, "obtain_certificate", fake_obtain) + monkeypatch.setattr(cli, "_resolve_cloudflare_api_key", lambda: None) + monkeypatch.setattr(cli.sys, "argv", ["certapi", "obtain", "example.com"]) + + cli.main() + assert called["domains"] == ["example.com"] + assert called["api_key"] is None + + +def test_main_help(monkeypatch): + monkeypatch.setattr(cli.sys, "argv", ["certapi"]) + with pytest.raises(SystemExit) as exc: + cli.main() + assert exc.value.code == 1