Coverage for amqtt/plugins/authentication.py: 86%
82 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from dataclasses import dataclass, field
2from pathlib import Path
4from passlib.apps import custom_app_context as pwd_context
6from amqtt.broker import BrokerContext
7from amqtt.contexts import BaseContext
8from amqtt.plugins.base import BaseAuthPlugin
9from amqtt.session import Session
11_PARTS_EXPECTED_LENGTH = 2 # Expected number of parts in a valid line
14class AnonymousAuthPlugin(BaseAuthPlugin):
15 """Authentication plugin allowing anonymous access."""
17 def __init__(self, context: BaseContext) -> None:
18 super().__init__(context)
20 # Default to allowing anonymous
21 self._allow_anonymous = self._get_config_option("allow-anonymous", True) # noqa: FBT003
23 async def authenticate(self, *, session: Session) -> bool:
24 authenticated = await super().authenticate(session=session)
25 if authenticated: 25 ↛ 36line 25 didn't jump to line 36 because the condition on line 25 was always true
27 if self._allow_anonymous:
28 self.context.logger.debug("Authentication success: config allows anonymous")
29 session.is_anonymous = True
30 return True
32 if session and session.username:
33 self.context.logger.debug(f"Authentication success: session has username '{session.username}'")
34 return True
35 self.context.logger.debug("Authentication failure: session has no username")
36 return False
38 @dataclass
39 class Config:
40 """Configuration for AnonymousAuthPlugin."""
42 allow_anonymous: bool = field(default=True)
43 """Allow all anonymous authentication (even with _no_ username)."""
46class FileAuthPlugin(BaseAuthPlugin):
47 """Authentication plugin based on a file-stored user database."""
49 def __init__(self, context: BrokerContext) -> None:
50 super().__init__(context)
51 self._users: dict[str, str] = {}
52 self._read_password_file()
54 def _read_password_file(self) -> None:
55 """Read the password file and populates the user dictionary."""
56 password_file = self._get_config_option("password-file", None)
57 if not password_file:
58 self.context.logger.warning("Configuration parameter 'password-file' not found")
59 return
61 try:
62 file = password_file
63 if isinstance(file, str): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 file = Path(file)
65 with file.open(mode="r", encoding="utf-8") as file:
66 self.context.logger.debug(f"Reading user database from {password_file}")
67 for _line in file:
68 line = _line.strip()
69 if line and not line.startswith("#"): # Skip empty lines and comments
70 parts = line.split(":", maxsplit=1)
71 if len(parts) == _PARTS_EXPECTED_LENGTH: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true
72 username, pwd_hash = parts
73 self._users[username] = pwd_hash
74 self.context.logger.debug(f"User '{username}' loaded")
75 else:
76 self.context.logger.warning(f"Malformed line in password file: {line}")
77 self.context.logger.info(f"{len(self._users)} user(s) loaded from {password_file}")
78 except FileNotFoundError:
79 self.context.logger.warning(f"Password file '{password_file}' not found")
80 except ValueError:
81 self.context.logger.exception(f"Malformed password file '{password_file}'")
82 except OSError:
83 self.context.logger.exception(f"Unexpected error reading password file '{password_file}'")
85 async def authenticate(self, *, session: Session) -> bool | None:
86 """Authenticate users based on the file-stored user database."""
87 authenticated = await super().authenticate(session=session)
88 if authenticated: 88 ↛ 107line 88 didn't jump to line 107 because the condition on line 88 was always true
89 if not session: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 self.context.logger.debug("Authentication failure: no session provided")
91 return False
93 if not session.username:
94 self.context.logger.debug("Authentication failure: no username provided in session")
95 return None
97 hash_session_username = self._users.get(session.username)
98 if not hash_session_username:
99 self.context.logger.debug(f"Authentication failure: no hash found for user '{session.username}'")
100 return False
102 if pwd_context.verify(session.password, hash_session_username):
103 self.context.logger.debug(f"Authentication success for user '{session.username}'")
104 return True
106 self.context.logger.debug(f"Authentication failure: password mismatch for user '{session.username}'")
107 return False
109 @dataclass
110 class Config:
111 """Configuration for FileAuthPlugin."""
113 password_file: str | Path | None = None
114 """Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512."""