Coverage for amqtt/contrib/cert.py: 86%
73 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from dataclasses import dataclass
2from datetime import datetime, timedelta
4try:
5 from datetime import UTC
6except ImportError:
7 # support for python 3.10
8 from datetime import timezone
9 UTC = timezone.utc
12from ipaddress import IPv4Address
13import logging
14from pathlib import Path
15import re
17from cryptography import x509
18from cryptography.hazmat.backends import default_backend
19from cryptography.hazmat.primitives import hashes, serialization
20from cryptography.hazmat.primitives.asymmetric import rsa
21from cryptography.x509 import Certificate, CertificateSigningRequest
22from cryptography.x509.oid import NameOID
24from amqtt.plugins.base import BaseAuthPlugin
25from amqtt.session import Session
27logger = logging.getLogger(__name__)
30class UserAuthCertPlugin(BaseAuthPlugin):
31 """Used a *signed* x509 certificate's `Subject AlternativeName` or `SAN` to verify client authentication.
33 Often used for IoT devices, this method provides the most secure form of identification. A root
34 certificate, often referenced as a CA certificate -- either issued by a known authority (such as LetsEncrypt)
35 or a self-signed certificate) is used to sign a private key and certificate for the server. Each device/client
36 also gets a unique private key and certificate signed by the same CA certificate; also included in the device
37 certificate is a 'SAN' or SubjectAlternativeName which is the device's unique identifier.
39 Since both server and device certificates are signed by the same CA certificate, the client can
40 verify the server's authenticity; and the server can verify the client's authenticity. And since
41 the device's certificate contains a x509 SAN, the server (with this plugin) can identify the device securely.
43 !!! note "URI and Client ID configuration"
44 `uri_domain` configuration must be set to the same uri used to generate the device credentials
46 when a device is connecting with private key and certificate, the `client_id` must
47 match the device id used to generate the device credentials.
49 Available ore three scripts to help with the key generation and certificate signing: `ca_creds`, `server_creds`
50 and `device_creds`.
52 !!! note "Configuring broker & client for using Self-signed root CA"
53 If using self-signed root credentials, the `cafile` configuration for both broker and client need to be
54 configured with `cafile` set to the `ca.crt`.
55 """
57 async def authenticate(self, *, session: Session) -> bool | None:
58 """Verify the client's session using the provided client's x509 certificate."""
59 if not session.ssl_object: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 return False
62 der_cert = session.ssl_object.getpeercert(binary_form=True)
63 if der_cert: 63 ↛ 83line 63 didn't jump to line 83 because the condition on line 63 was always true
64 cert = x509.load_der_x509_certificate(der_cert, backend=default_backend())
66 try:
67 san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
68 uris = san.value.get_values_for_type(x509.UniformResourceIdentifier)
70 if self.config.uri_domain not in uris[0]:
71 return False
73 pattern = rf"^spiffe://{re.escape(self.config.uri_domain)}/device/([^/]+)$"
74 match = re.match(pattern, uris[0])
75 if not match: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true
76 return False
78 return match.group(1) == session.client_id
80 except x509.ExtensionNotFound:
81 logger.warning("No SAN extension found.")
83 return False
85 @dataclass
86 class Config:
87 """Configuration for the CertificateAuthPlugin."""
89 uri_domain: str
90 """The domain that is expected as part of the device certificate's spiffe (e.g. test.amqtt.io)"""
93def generate_root_creds(country: str, state: str, locality: str,
94 org_name: str, cn: str) -> tuple[rsa.RSAPrivateKey, Certificate]:
95 """Generate CA key and certificate."""
96 # generate private key for the server
97 ca_key = rsa.generate_private_key(
98 public_exponent=65537,
99 key_size=4096,
100 )
101 # Create certificate subject and issuer (self-signed)
102 subject = issuer = x509.Name([
103 x509.NameAttribute(NameOID.COUNTRY_NAME, country),
104 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
105 x509.NameAttribute(NameOID.LOCALITY_NAME, locality),
106 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name),
107 x509.NameAttribute(NameOID.COMMON_NAME, cn),
108 ])
110 # 3. Build self-signed certificate
111 cert = (
112 x509.CertificateBuilder()
113 .subject_name(subject)
114 .issuer_name(issuer)
115 .public_key(ca_key.public_key())
116 .serial_number(x509.random_serial_number())
117 .not_valid_before(datetime.now(UTC))
118 .not_valid_after(datetime.now(UTC) + timedelta(days=3650)) # 10 years
119 .add_extension(
120 x509.BasicConstraints(ca=True, path_length=None),
121 critical=True,
122 )
123 .add_extension(
124 x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()),
125 critical=False,
126 )
127 .add_extension(
128 x509.KeyUsage(
129 key_cert_sign=True,
130 crl_sign=True,
131 digital_signature=False,
132 key_encipherment=False,
133 content_commitment=False,
134 data_encipherment=False,
135 key_agreement=False,
136 encipher_only=False,
137 decipher_only=False,
138 ),
139 critical=True,
140 )
141 .sign(ca_key, hashes.SHA256())
142 )
144 return ca_key, cert
147def generate_server_csr(country: str, org_name: str, cn: str) -> tuple[rsa.RSAPrivateKey, CertificateSigningRequest]:
148 """Generate server private key and server certificate-signing-request."""
149 key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
151 csr = (
152 x509.CertificateSigningRequestBuilder()
153 .subject_name(x509.Name([
154 x509.NameAttribute(NameOID.COUNTRY_NAME, country),
155 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name),
156 x509.NameAttribute(NameOID.COMMON_NAME, cn),
157 ]))
158 .add_extension(
159 x509.SubjectAlternativeName([
160 x509.DNSName(cn),
161 x509.IPAddress(IPv4Address("127.0.0.1")),
162 ]),
163 critical=False,
164 )
165 .sign(key, hashes.SHA256())
166 )
168 return key, csr
171def generate_device_csr(country: str, org_name: str, common_name: str,
172 uri_san: str, dns_san: str
173 ) -> tuple[rsa.RSAPrivateKey, CertificateSigningRequest]:
174 """Generate a device key and a csr."""
175 key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
177 csr = (
178 x509.CertificateSigningRequestBuilder()
179 .subject_name(x509.Name([
180 x509.NameAttribute(NameOID.COUNTRY_NAME, country),
181 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name
182 ),
183 x509.NameAttribute(NameOID.COMMON_NAME, common_name),
184 ]))
185 .add_extension(
186 x509.SubjectAlternativeName([
187 x509.UniformResourceIdentifier(uri_san),
188 x509.DNSName(dns_san),
189 ]),
190 critical=False,
191 )
192 .sign(key, hashes.SHA256())
193 )
195 return key, csr
198def sign_csr(csr: CertificateSigningRequest,
199 ca_key: rsa.RSAPrivateKey,
200 ca_cert: Certificate, validity_days: int = 365) -> Certificate:
201 """Sign a csr with CA credentials."""
202 return (
203 x509.CertificateBuilder()
204 .subject_name(csr.subject)
205 .issuer_name(ca_cert.subject)
206 .public_key(csr.public_key())
207 .serial_number(x509.random_serial_number())
208 .not_valid_before(datetime.now(UTC))
209 .not_valid_after(datetime.now(UTC) + timedelta(days=validity_days))
210 .add_extension(
211 x509.BasicConstraints(ca=False, path_length=None),
212 critical=True,
213 )
214 .add_extension(
215 csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value,
216 critical=False,
217 )
218 .add_extension(
219 x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()), # type: ignore[arg-type]
220 critical=False,
221 )
222 .sign(ca_key, hashes.SHA256())
223 )
226def load_ca(ca_key_fn: str, ca_crt_fn: str) -> tuple[rsa.RSAPrivateKey, Certificate]:
227 """Load server key and certificate."""
228 with Path(ca_key_fn).open("rb") as f:
229 ca_key: rsa.RSAPrivateKey = serialization.load_pem_private_key(f.read(), password=None) # type: ignore[assignment]
230 with Path(ca_crt_fn).open("rb") as f:
231 ca_cert = x509.load_pem_x509_certificate(f.read())
232 return ca_key, ca_cert
235def write_key_and_crt(key: rsa.RSAPrivateKey, crt: Certificate,
236 prefix: str, path: Path | None = None) -> None:
237 """Create pem-encoded files for key and certificate."""
238 path = path or Path()
240 crt_fn = path / f"{prefix}.crt"
241 key_fn = path / f"{prefix}.key"
243 with crt_fn.open("wb") as f:
244 f.write(crt.public_bytes(serialization.Encoding.PEM))
245 with key_fn.open("wb") as f:
246 f.write(key.private_bytes(
247 serialization.Encoding.PEM,
248 serialization.PrivateFormat.TraditionalOpenSSL,
249 serialization.NoEncryption()
250 ))