Coverage for amqtt/contrib/auth_db/plugin.py: 82%

66 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from dataclasses import dataclass, field 

2import logging 

3 

4from passlib.context import CryptContext 

5from sqlalchemy.ext.asyncio import create_async_engine 

6 

7from amqtt.broker import BrokerContext 

8from amqtt.contexts import Action 

9from amqtt.contrib.auth_db.managers import TopicManager, UserManager 

10from amqtt.contrib.auth_db.models import Base, PasswordHasher 

11from amqtt.errors import MQTTError 

12from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin 

13from amqtt.session import Session 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18def default_hash_scheme() -> list[str]: 

19 """Create config dataclass defaults.""" 

20 return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"] 

21 

22 

23class UserAuthDBPlugin(BaseAuthPlugin): 

24 

25 def __init__(self, context: BrokerContext) -> None: 

26 super().__init__(context) 

27 

28 # access the singleton and set the proper crypt context 

29 pwd_hasher = PasswordHasher() 

30 pwd_hasher.crypt_context = CryptContext(schemes=self.config.hash_schemes, deprecated="auto") 

31 

32 self._user_manager = UserManager(self.config.connection) 

33 self._engine = create_async_engine(f"{self.config.connection}") 

34 

35 async def on_broker_pre_start(self) -> None: 

36 """Sync the schema (if configured).""" 

37 if not self.config.sync_schema: 37 ↛ 39line 37 didn't jump to line 39 because the condition on line 37 was always true

38 return 

39 async with self._engine.begin() as conn: 

40 await conn.run_sync(Base.metadata.create_all) 

41 

42 async def authenticate(self, *, session: Session) -> bool | None: 

43 """Authenticate a client's session.""" 

44 if not session.username or not session.password: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 return False 

46 

47 user_auth = await self._user_manager.get_user_auth(session.username) 

48 if not user_auth: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 return False 

50 

51 return bool(session.password) and user_auth.verify_password(session.password) 

52 

53 @dataclass 

54 class Config: 

55 """Configuration for DB authentication.""" 

56 

57 connection: str 

58 """SQLAlchemy connection string for the asyncio version of the database connector: 

59 

60 - `mysql+aiomysql://user:password@host:port/dbname` 

61 - `postgresql+asyncpg://user:password@host:port/dbname` 

62 - `sqlite+aiosqlite:///dbfilename.db` 

63 """ 

64 sync_schema: bool = False 

65 """Use SQLAlchemy to create / update the database schema.""" 

66 hash_schemes: list[str] = field(default_factory=default_hash_scheme) 

67 """list of hash schemes to use for passwords""" 

68 

69 

70class TopicAuthDBPlugin(BaseTopicPlugin): 

71 

72 def __init__(self, context: BrokerContext) -> None: 

73 super().__init__(context) 

74 

75 self._topic_manager = TopicManager(self.config.connection) 

76 self._engine = create_async_engine(f"{self.config.connection}") 

77 

78 async def on_broker_pre_start(self) -> None: 

79 """Sync the schema (if configured).""" 

80 if not self.config.sync_schema: 80 ↛ 82line 80 didn't jump to line 82 because the condition on line 80 was always true

81 return 

82 async with self._engine.begin() as conn: 

83 await conn.run_sync(Base.metadata.create_all) 

84 

85 async def topic_filtering( 

86 self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None 

87 ) -> bool | None: 

88 if not session or not session.username or not topic: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return None 

90 

91 try: 

92 topic_auth = await self._topic_manager.get_topic_auth(session.username) 

93 topic_list = getattr(topic_auth, f"{action}_acl") 

94 except MQTTError: 

95 return False 

96 

97 return topic in topic_list 

98 

99 @dataclass 

100 class Config: 

101 """Configuration for DB topic filtering.""" 

102 

103 connection: str 

104 """SQLAlchemy connection string for the asyncio version of the database connector: 

105 

106 - `mysql+aiomysql://user:password@host:port/dbname` 

107 - `postgresql+asyncpg://user:password@host:port/dbname` 

108 - `sqlite+aiosqlite:///dbfilename.db` 

109 """ 

110 sync_schema: bool = False 

111 """Use SQLAlchemy to create / update the database schema."""