Coverage for amqtt/contrib/auth_db/plugin.py: 82%
66 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from dataclasses import dataclass, field
2import logging
4from passlib.context import CryptContext
5from sqlalchemy.ext.asyncio import create_async_engine
7from amqtt.broker import BrokerContext
8from amqtt.contexts import Action
9from amqtt.contrib.auth_db.managers import TopicManager, UserManager
10from amqtt.contrib.auth_db.models import Base, PasswordHasher
11from amqtt.errors import MQTTError
12from amqtt.plugins.base import BaseAuthPlugin, BaseTopicPlugin
13from amqtt.session import Session
15logger = logging.getLogger(__name__)
18def default_hash_scheme() -> list[str]:
19 """Create config dataclass defaults."""
20 return ["argon2", "bcrypt", "pbkdf2_sha256", "scrypt"]
23class UserAuthDBPlugin(BaseAuthPlugin):
25 def __init__(self, context: BrokerContext) -> None:
26 super().__init__(context)
28 # access the singleton and set the proper crypt context
29 pwd_hasher = PasswordHasher()
30 pwd_hasher.crypt_context = CryptContext(schemes=self.config.hash_schemes, deprecated="auto")
32 self._user_manager = UserManager(self.config.connection)
33 self._engine = create_async_engine(f"{self.config.connection}")
35 async def on_broker_pre_start(self) -> None:
36 """Sync the schema (if configured)."""
37 if not self.config.sync_schema: 37 ↛ 39line 37 didn't jump to line 39 because the condition on line 37 was always true
38 return
39 async with self._engine.begin() as conn:
40 await conn.run_sync(Base.metadata.create_all)
42 async def authenticate(self, *, session: Session) -> bool | None:
43 """Authenticate a client's session."""
44 if not session.username or not session.password: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 return False
47 user_auth = await self._user_manager.get_user_auth(session.username)
48 if not user_auth: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 return False
51 return bool(session.password) and user_auth.verify_password(session.password)
53 @dataclass
54 class Config:
55 """Configuration for DB authentication."""
57 connection: str
58 """SQLAlchemy connection string for the asyncio version of the database connector:
60 - `mysql+aiomysql://user:password@host:port/dbname`
61 - `postgresql+asyncpg://user:password@host:port/dbname`
62 - `sqlite+aiosqlite:///dbfilename.db`
63 """
64 sync_schema: bool = False
65 """Use SQLAlchemy to create / update the database schema."""
66 hash_schemes: list[str] = field(default_factory=default_hash_scheme)
67 """list of hash schemes to use for passwords"""
70class TopicAuthDBPlugin(BaseTopicPlugin):
72 def __init__(self, context: BrokerContext) -> None:
73 super().__init__(context)
75 self._topic_manager = TopicManager(self.config.connection)
76 self._engine = create_async_engine(f"{self.config.connection}")
78 async def on_broker_pre_start(self) -> None:
79 """Sync the schema (if configured)."""
80 if not self.config.sync_schema: 80 ↛ 82line 80 didn't jump to line 82 because the condition on line 80 was always true
81 return
82 async with self._engine.begin() as conn:
83 await conn.run_sync(Base.metadata.create_all)
85 async def topic_filtering(
86 self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None
87 ) -> bool | None:
88 if not session or not session.username or not topic: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true
89 return None
91 try:
92 topic_auth = await self._topic_manager.get_topic_auth(session.username)
93 topic_list = getattr(topic_auth, f"{action}_acl")
94 except MQTTError:
95 return False
97 return topic in topic_list
99 @dataclass
100 class Config:
101 """Configuration for DB topic filtering."""
103 connection: str
104 """SQLAlchemy connection string for the asyncio version of the database connector:
106 - `mysql+aiomysql://user:password@host:port/dbname`
107 - `postgresql+asyncpg://user:password@host:port/dbname`
108 - `sqlite+aiosqlite:///dbfilename.db`
109 """
110 sync_schema: bool = False
111 """Use SQLAlchemy to create / update the database schema."""