Coverage for amqtt/contrib/cert.py: 86%

73 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from dataclasses import dataclass 

2from datetime import datetime, timedelta 

3 

4try: 

5 from datetime import UTC 

6except ImportError: 

7 # support for python 3.10 

8 from datetime import timezone 

9 UTC = timezone.utc 

10 

11 

12from ipaddress import IPv4Address 

13import logging 

14from pathlib import Path 

15import re 

16 

17from cryptography import x509 

18from cryptography.hazmat.backends import default_backend 

19from cryptography.hazmat.primitives import hashes, serialization 

20from cryptography.hazmat.primitives.asymmetric import rsa 

21from cryptography.x509 import Certificate, CertificateSigningRequest 

22from cryptography.x509.oid import NameOID 

23 

24from amqtt.plugins.base import BaseAuthPlugin 

25from amqtt.session import Session 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30class UserAuthCertPlugin(BaseAuthPlugin): 

31 """Used a *signed* x509 certificate's `Subject AlternativeName` or `SAN` to verify client authentication. 

32 

33 Often used for IoT devices, this method provides the most secure form of identification. A root 

34 certificate, often referenced as a CA certificate -- either issued by a known authority (such as LetsEncrypt) 

35 or a self-signed certificate) is used to sign a private key and certificate for the server. Each device/client 

36 also gets a unique private key and certificate signed by the same CA certificate; also included in the device 

37 certificate is a 'SAN' or SubjectAlternativeName which is the device's unique identifier. 

38 

39 Since both server and device certificates are signed by the same CA certificate, the client can 

40 verify the server's authenticity; and the server can verify the client's authenticity. And since 

41 the device's certificate contains a x509 SAN, the server (with this plugin) can identify the device securely. 

42 

43 !!! note "URI and Client ID configuration" 

44 `uri_domain` configuration must be set to the same uri used to generate the device credentials 

45 

46 when a device is connecting with private key and certificate, the `client_id` must 

47 match the device id used to generate the device credentials. 

48 

49 Available ore three scripts to help with the key generation and certificate signing: `ca_creds`, `server_creds` 

50 and `device_creds`. 

51 

52 !!! note "Configuring broker & client for using Self-signed root CA" 

53 If using self-signed root credentials, the `cafile` configuration for both broker and client need to be 

54 configured with `cafile` set to the `ca.crt`. 

55 """ 

56 

57 async def authenticate(self, *, session: Session) -> bool | None: 

58 """Verify the client's session using the provided client's x509 certificate.""" 

59 if not session.ssl_object: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 return False 

61 

62 der_cert = session.ssl_object.getpeercert(binary_form=True) 

63 if der_cert: 63 ↛ 83line 63 didn't jump to line 83 because the condition on line 63 was always true

64 cert = x509.load_der_x509_certificate(der_cert, backend=default_backend()) 

65 

66 try: 

67 san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName) 

68 uris = san.value.get_values_for_type(x509.UniformResourceIdentifier) 

69 

70 if self.config.uri_domain not in uris[0]: 

71 return False 

72 

73 pattern = rf"^spiffe://{re.escape(self.config.uri_domain)}/device/([^/]+)$" 

74 match = re.match(pattern, uris[0]) 

75 if not match: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true

76 return False 

77 

78 return match.group(1) == session.client_id 

79 

80 except x509.ExtensionNotFound: 

81 logger.warning("No SAN extension found.") 

82 

83 return False 

84 

85 @dataclass 

86 class Config: 

87 """Configuration for the CertificateAuthPlugin.""" 

88 

89 uri_domain: str 

90 """The domain that is expected as part of the device certificate's spiffe (e.g. test.amqtt.io)""" 

91 

92 

93def generate_root_creds(country: str, state: str, locality: str, 

94 org_name: str, cn: str) -> tuple[rsa.RSAPrivateKey, Certificate]: 

95 """Generate CA key and certificate.""" 

96 # generate private key for the server 

97 ca_key = rsa.generate_private_key( 

98 public_exponent=65537, 

99 key_size=4096, 

100 ) 

101 # Create certificate subject and issuer (self-signed) 

102 subject = issuer = x509.Name([ 

103 x509.NameAttribute(NameOID.COUNTRY_NAME, country), 

104 x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state), 

105 x509.NameAttribute(NameOID.LOCALITY_NAME, locality), 

106 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name), 

107 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

108 ]) 

109 

110 # 3. Build self-signed certificate 

111 cert = ( 

112 x509.CertificateBuilder() 

113 .subject_name(subject) 

114 .issuer_name(issuer) 

115 .public_key(ca_key.public_key()) 

116 .serial_number(x509.random_serial_number()) 

117 .not_valid_before(datetime.now(UTC)) 

118 .not_valid_after(datetime.now(UTC) + timedelta(days=3650)) # 10 years 

119 .add_extension( 

120 x509.BasicConstraints(ca=True, path_length=None), 

121 critical=True, 

122 ) 

123 .add_extension( 

124 x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key()), 

125 critical=False, 

126 ) 

127 .add_extension( 

128 x509.KeyUsage( 

129 key_cert_sign=True, 

130 crl_sign=True, 

131 digital_signature=False, 

132 key_encipherment=False, 

133 content_commitment=False, 

134 data_encipherment=False, 

135 key_agreement=False, 

136 encipher_only=False, 

137 decipher_only=False, 

138 ), 

139 critical=True, 

140 ) 

141 .sign(ca_key, hashes.SHA256()) 

142 ) 

143 

144 return ca_key, cert 

145 

146 

147def generate_server_csr(country: str, org_name: str, cn: str) -> tuple[rsa.RSAPrivateKey, CertificateSigningRequest]: 

148 """Generate server private key and server certificate-signing-request.""" 

149 key = rsa.generate_private_key(public_exponent=65537, key_size=2048) 

150 

151 csr = ( 

152 x509.CertificateSigningRequestBuilder() 

153 .subject_name(x509.Name([ 

154 x509.NameAttribute(NameOID.COUNTRY_NAME, country), 

155 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name), 

156 x509.NameAttribute(NameOID.COMMON_NAME, cn), 

157 ])) 

158 .add_extension( 

159 x509.SubjectAlternativeName([ 

160 x509.DNSName(cn), 

161 x509.IPAddress(IPv4Address("127.0.0.1")), 

162 ]), 

163 critical=False, 

164 ) 

165 .sign(key, hashes.SHA256()) 

166 ) 

167 

168 return key, csr 

169 

170 

171def generate_device_csr(country: str, org_name: str, common_name: str, 

172 uri_san: str, dns_san: str 

173 ) -> tuple[rsa.RSAPrivateKey, CertificateSigningRequest]: 

174 """Generate a device key and a csr.""" 

175 key = rsa.generate_private_key(public_exponent=65537, key_size=2048) 

176 

177 csr = ( 

178 x509.CertificateSigningRequestBuilder() 

179 .subject_name(x509.Name([ 

180 x509.NameAttribute(NameOID.COUNTRY_NAME, country), 

181 x509.NameAttribute(NameOID.ORGANIZATION_NAME, org_name 

182 ), 

183 x509.NameAttribute(NameOID.COMMON_NAME, common_name), 

184 ])) 

185 .add_extension( 

186 x509.SubjectAlternativeName([ 

187 x509.UniformResourceIdentifier(uri_san), 

188 x509.DNSName(dns_san), 

189 ]), 

190 critical=False, 

191 ) 

192 .sign(key, hashes.SHA256()) 

193 ) 

194 

195 return key, csr 

196 

197 

198def sign_csr(csr: CertificateSigningRequest, 

199 ca_key: rsa.RSAPrivateKey, 

200 ca_cert: Certificate, validity_days: int = 365) -> Certificate: 

201 """Sign a csr with CA credentials.""" 

202 return ( 

203 x509.CertificateBuilder() 

204 .subject_name(csr.subject) 

205 .issuer_name(ca_cert.subject) 

206 .public_key(csr.public_key()) 

207 .serial_number(x509.random_serial_number()) 

208 .not_valid_before(datetime.now(UTC)) 

209 .not_valid_after(datetime.now(UTC) + timedelta(days=validity_days)) 

210 .add_extension( 

211 x509.BasicConstraints(ca=False, path_length=None), 

212 critical=True, 

213 ) 

214 .add_extension( 

215 csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value, 

216 critical=False, 

217 ) 

218 .add_extension( 

219 x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()), # type: ignore[arg-type] 

220 critical=False, 

221 ) 

222 .sign(ca_key, hashes.SHA256()) 

223 ) 

224 

225 

226def load_ca(ca_key_fn: str, ca_crt_fn: str) -> tuple[rsa.RSAPrivateKey, Certificate]: 

227 """Load server key and certificate.""" 

228 with Path(ca_key_fn).open("rb") as f: 

229 ca_key: rsa.RSAPrivateKey = serialization.load_pem_private_key(f.read(), password=None) # type: ignore[assignment] 

230 with Path(ca_crt_fn).open("rb") as f: 

231 ca_cert = x509.load_pem_x509_certificate(f.read()) 

232 return ca_key, ca_cert 

233 

234 

235def write_key_and_crt(key: rsa.RSAPrivateKey, crt: Certificate, 

236 prefix: str, path: Path | None = None) -> None: 

237 """Create pem-encoded files for key and certificate.""" 

238 path = path or Path() 

239 

240 crt_fn = path / f"{prefix}.crt" 

241 key_fn = path / f"{prefix}.key" 

242 

243 with crt_fn.open("wb") as f: 

244 f.write(crt.public_bytes(serialization.Encoding.PEM)) 

245 with key_fn.open("wb") as f: 

246 f.write(key.private_bytes( 

247 serialization.Encoding.PEM, 

248 serialization.PrivateFormat.TraditionalOpenSSL, 

249 serialization.NoEncryption() 

250 ))