Coverage for amqtt/contrib/auth_db/user_mgr_cli.py: 86%
98 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2import logging
3from pathlib import Path
4from typing import Annotated
6import click
7import passlib
8import typer
10from amqtt.contrib.auth_db import DBType, db_connection_str
11from amqtt.contrib.auth_db.managers import UserManager
12from amqtt.errors import MQTTError
14logging.basicConfig(level=logging.INFO, format="%(message)s")
15logger = logging.getLogger(__name__)
16user_app = typer.Typer(no_args_is_help=True)
19@user_app.callback()
20def main(
21 ctx: typer.Context,
22 db_type: Annotated[DBType, typer.Option(..., "--db", "-d", help="db type", show_default=False)],
23 db_username: Annotated[str, typer.Option("--username", "-u", help="db username", show_default=False)] = "",
24 db_port: Annotated[int, typer.Option("--port", "-p", help="database port (defaults to db type)", show_default=False)] = 0,
25 db_host: Annotated[str, typer.Option("--host", "-h", help="database host")] = "localhost",
26 db_filename: Annotated[str, typer.Option("--file", "-f", help="database file name (sqlite only)")] = "auth.db",
27) -> None:
28 """Command line interface to list, create, remove and add clients.
30 Passwords are not allowed to be passed via the command line for security reasons. You will be prompted for database
31 password (if applicable) and the client id's password.
33 If you need to create users programmatically, see `amqtt.contrib.auth_db.managers.UserManager` which provides
34 the underlying functionality to this command line interface.
35 """
36 if db_type == DBType.SQLITE and ctx.invoked_subcommand == "sync" and not Path(db_filename).exists():
37 pass
38 elif db_type == DBType.SQLITE and not Path(db_filename).exists(): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 logger.error(f"SQLite option could not find '{db_filename}'")
40 raise typer.Exit(code=1)
41 elif db_type != DBType.SQLITE and not db_username:
42 logger.error("DB access requires a username be provided.")
43 raise typer.Exit(code=1)
45 ctx.obj = {"type": db_type, "username": db_username, "host": db_host, "port": db_port, "filename": db_filename}
48@user_app.command(name="sync")
49def db_sync(ctx: typer.Context) -> None:
50 """Create the table and schema for username and hashed password.
52 Non-destructive if run multiple times. To clear the whole table, need to drop it manually.
53 """
54 async def run_sync() -> None:
55 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"])
56 mgr = UserManager(connect)
57 try:
58 await mgr.db_sync()
59 except MQTTError as me:
60 logger.critical("Could not sync schema on db.")
61 raise typer.Exit(code=1) from me
63 asyncio.run(run_sync())
64 logger.info("Success: database synced.")
67@user_app.command(name="list")
68def list_user_auths(ctx: typer.Context) -> None:
69 """List all Client IDs (in alphabetical order). Will also display the hashed passwords."""
71 async def run_list() -> None:
72 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"])
73 mgr = UserManager(connect)
74 user_count = 0
75 for user in await mgr.list_user_auths():
76 user_count += 1
77 logger.info(user)
79 if not user_count:
80 logger.info("No client authentications exist.")
82 asyncio.run(run_list())
85@user_app.command(name="add")
86def create_user_auth(
87 ctx: typer.Context,
88 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the new client")],
89 ) -> None:
90 """Create a new user with a client id and password (prompted)."""
91 async def run_create() -> None:
92 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"],
93 ctx.obj["filename"])
94 mgr = UserManager(connect)
95 client_password = click.prompt("Enter the client's password", hide_input=True)
96 if not client_password.strip():
97 logger.info("Error: client password cannot be empty.")
98 raise typer.Exit(1)
99 try:
100 user = await mgr.create_user_auth(client_id, client_password.strip())
101 except passlib.exc.MissingBackendError as mbe:
102 logger.info(f"Please install backend: {mbe}")
103 raise typer.Exit(code=1) from mbe
105 if not user: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 logger.info(f"Error: could not create user: {client_id}")
107 raise typer.Exit(code=1)
109 logger.info(f"Success: created {user}")
111 asyncio.run(run_create())
114@user_app.command(name="rm")
115def remove_user_auth(ctx: typer.Context,
116 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client to remove")]) -> None:
117 """Remove a client from the authentication database."""
118 async def run_remove() -> None:
119 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"],
120 ctx.obj["filename"])
121 mgr = UserManager(connect)
122 user = await mgr.get_user_auth(client_id)
123 if not user:
124 logger.info(f"Error: client '{client_id}' does not exist.")
125 raise typer.Exit(1)
127 if not click.confirm(f"Please confirm the removal of '{client_id}'?"):
128 raise typer.Exit(0)
130 user = await mgr.delete_user_auth(client_id)
131 if not user: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 logger.info(f"Error: client '{client_id}' does not exist.")
133 raise typer.Exit(1)
135 logger.info(f"Success: '{user.username}' was removed.")
137 asyncio.run(run_remove())
140@user_app.command(name="pwd")
141def change_password(
142 ctx: typer.Context,
143 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the new client")],
144 ) -> None:
145 """Update a user's password (prompted)."""
146 async def run_password() -> None:
147 client_password = click.prompt("Enter the client's new password", hide_input=True)
148 if not client_password.strip():
149 logger.error("Error: client password cannot be empty.")
150 raise typer.Exit(1)
151 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"],
152 ctx.obj["filename"])
153 mgr = UserManager(connect)
154 await mgr.update_user_auth_password(client_id, client_password.strip())
155 logger.info(f"Success: client '{client_id}' password updated.")
157 asyncio.run(run_password())
160if __name__ == "__main__": 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 user_app()