Coverage for amqtt/plugins/authentication.py: 86%

82 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from dataclasses import dataclass, field 

2from pathlib import Path 

3 

4from passlib.apps import custom_app_context as pwd_context 

5 

6from amqtt.broker import BrokerContext 

7from amqtt.contexts import BaseContext 

8from amqtt.plugins.base import BaseAuthPlugin 

9from amqtt.session import Session 

10 

11_PARTS_EXPECTED_LENGTH = 2 # Expected number of parts in a valid line 

12 

13 

14class AnonymousAuthPlugin(BaseAuthPlugin): 

15 """Authentication plugin allowing anonymous access.""" 

16 

17 def __init__(self, context: BaseContext) -> None: 

18 super().__init__(context) 

19 

20 # Default to allowing anonymous 

21 self._allow_anonymous = self._get_config_option("allow-anonymous", True) # noqa: FBT003 

22 

23 async def authenticate(self, *, session: Session) -> bool: 

24 authenticated = await super().authenticate(session=session) 

25 if authenticated: 25 ↛ 36line 25 didn't jump to line 36 because the condition on line 25 was always true

26 

27 if self._allow_anonymous: 

28 self.context.logger.debug("Authentication success: config allows anonymous") 

29 session.is_anonymous = True 

30 return True 

31 

32 if session and session.username: 

33 self.context.logger.debug(f"Authentication success: session has username '{session.username}'") 

34 return True 

35 self.context.logger.debug("Authentication failure: session has no username") 

36 return False 

37 

38 @dataclass 

39 class Config: 

40 """Configuration for AnonymousAuthPlugin.""" 

41 

42 allow_anonymous: bool = field(default=True) 

43 """Allow all anonymous authentication (even with _no_ username).""" 

44 

45 

46class FileAuthPlugin(BaseAuthPlugin): 

47 """Authentication plugin based on a file-stored user database.""" 

48 

49 def __init__(self, context: BrokerContext) -> None: 

50 super().__init__(context) 

51 self._users: dict[str, str] = {} 

52 self._read_password_file() 

53 

54 def _read_password_file(self) -> None: 

55 """Read the password file and populates the user dictionary.""" 

56 password_file = self._get_config_option("password-file", None) 

57 if not password_file: 

58 self.context.logger.warning("Configuration parameter 'password-file' not found") 

59 return 

60 

61 try: 

62 file = password_file 

63 if isinstance(file, str): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 file = Path(file) 

65 with file.open(mode="r", encoding="utf-8") as file: 

66 self.context.logger.debug(f"Reading user database from {password_file}") 

67 for _line in file: 

68 line = _line.strip() 

69 if line and not line.startswith("#"): # Skip empty lines and comments 

70 parts = line.split(":", maxsplit=1) 

71 if len(parts) == _PARTS_EXPECTED_LENGTH: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true

72 username, pwd_hash = parts 

73 self._users[username] = pwd_hash 

74 self.context.logger.debug(f"User '{username}' loaded") 

75 else: 

76 self.context.logger.warning(f"Malformed line in password file: {line}") 

77 self.context.logger.info(f"{len(self._users)} user(s) loaded from {password_file}") 

78 except FileNotFoundError: 

79 self.context.logger.warning(f"Password file '{password_file}' not found") 

80 except ValueError: 

81 self.context.logger.exception(f"Malformed password file '{password_file}'") 

82 except OSError: 

83 self.context.logger.exception(f"Unexpected error reading password file '{password_file}'") 

84 

85 async def authenticate(self, *, session: Session) -> bool | None: 

86 """Authenticate users based on the file-stored user database.""" 

87 authenticated = await super().authenticate(session=session) 

88 if authenticated: 88 ↛ 107line 88 didn't jump to line 107 because the condition on line 88 was always true

89 if not session: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 self.context.logger.debug("Authentication failure: no session provided") 

91 return False 

92 

93 if not session.username: 

94 self.context.logger.debug("Authentication failure: no username provided in session") 

95 return None 

96 

97 hash_session_username = self._users.get(session.username) 

98 if not hash_session_username: 

99 self.context.logger.debug(f"Authentication failure: no hash found for user '{session.username}'") 

100 return False 

101 

102 if pwd_context.verify(session.password, hash_session_username): 

103 self.context.logger.debug(f"Authentication success for user '{session.username}'") 

104 return True 

105 

106 self.context.logger.debug(f"Authentication failure: password mismatch for user '{session.username}'") 

107 return False 

108 

109 @dataclass 

110 class Config: 

111 """Configuration for FileAuthPlugin.""" 

112 

113 password_file: str | Path | None = None 

114 """Path to file with `username:password` pairs, one per line. All passwords are encoded using sha-512."""