Coverage for amqtt/contrib/auth_db/topic_mgr_cli.py: 81%
86 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1import asyncio
2import contextlib
3import logging
4from pathlib import Path
5from typing import Annotated
7import typer
9from amqtt.contexts import Action
10from amqtt.contrib.auth_db import DBType, db_connection_str
11from amqtt.contrib.auth_db.managers import TopicManager, UserManager
12from amqtt.errors import MQTTError
14logging.basicConfig(level=logging.INFO, format="%(message)s")
15logger = logging.getLogger(__name__)
16topic_app = typer.Typer(no_args_is_help=True)
19@topic_app.callback()
20def main(
21 ctx: typer.Context,
22 db_type: Annotated[DBType, typer.Option("--db", "-d", help="db type", count=False)],
23 db_username: Annotated[str, typer.Option("--username", "-u", help="db username", show_default=False)] = "",
24 db_port: Annotated[int, typer.Option("--port", "-p", help="database port (defaults to db type)", show_default=False)] = 0,
25 db_host: Annotated[str, typer.Option("--host", "-h", help="database host")] = "localhost",
26 db_filename: Annotated[str, typer.Option("--file", "-f", help="database file name (sqlite only)")] = "auth.db",
27) -> None:
28 """Command line interface to add / remove topic authorization.
30 Passwords are not allowed to be passed via the command line for security reasons. You will be prompted for database
31 password (if applicable).
33 If you need to create users programmatically, see `amqtt.contrib.auth_db.managers.TopicManager` which provides
34 the underlying functionality to this command line interface.
35 """
36 if db_type == DBType.SQLITE and ctx.invoked_subcommand == "sync" and not Path(db_filename).exists():
37 pass
38 elif db_type == DBType.SQLITE and not Path(db_filename).exists(): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true
39 logger.error(f"SQLite option could not find '{db_filename}'")
40 raise typer.Exit(code=1)
41 elif db_type != DBType.SQLITE and not db_username:
42 logger.error("DB access requires a username be provided.")
43 raise typer.Exit(code=1)
45 ctx.obj = {"type": db_type, "username": db_username, "host": db_host, "port": db_port, "filename": db_filename}
48@topic_app.command(name="sync")
49def db_sync(ctx: typer.Context) -> None:
50 """Create the table and schema for username and topic lists for subscribe, publish or receive.
52 Non-destructive if run multiple times. To clear the whole table, need to drop it manually.
53 """
54 async def run_sync() -> None:
55 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"])
56 mgr = UserManager(connect)
57 try:
58 await mgr.db_sync()
59 except MQTTError as me:
60 logger.critical("Could not sync schema on db.")
61 raise typer.Exit(code=1) from me
62 asyncio.run(run_sync())
63 logger.info("Success: database synced.")
66@topic_app.command(name="list")
67def list_clients(ctx: typer.Context) -> None:
68 """List all Client IDs (in alphabetical order). Will also display the hashed passwords."""
70 async def run_list() -> None:
71 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"])
72 mgr = TopicManager(connect)
73 user_count = 0
74 for user in await mgr.list_topic_auths():
75 user_count += 1
76 logger.info(user)
78 if not user_count:
79 logger.info("No client authorizations exist.")
81 asyncio.run(run_list())
84@topic_app.command(name="add")
85def add_topic_allowance(
86 ctx: typer.Context,
87 topic: Annotated[str, typer.Argument(help="list of topics", show_default=False)],
88 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client", show_default=False)],
89 action: Annotated[Action, typer.Option("--action", "-a", help="action for topic to allow", show_default=False)]
90 ) -> None:
91 """Create a new user with a client id and password (prompted)."""
92 async def run_add() -> None:
94 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"],
95 ctx.obj["filename"])
96 mgr = TopicManager(connect)
98 with contextlib.suppress(MQTTError):
99 await mgr.create_topic_auth(client_id)
101 topic_auth = await mgr.get_topic_auth(client_id)
102 if not topic_auth: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 logger.info(f"Topic auth doesn't exist for '{client_id}'")
104 raise typer.Exit(code=1)
106 if topic in [allowed_topic.topic for allowed_topic in topic_auth.get_topic_list(action)]: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 logger.info(f"Topic '{topic}' already exists for '{action}'.")
108 raise typer.Exit(1)
110 await mgr.add_allowed_topic(client_id, topic, action)
112 logger.info(f"Success: topic '{topic}' added to {action} for '{client_id}'")
114 asyncio.run(run_add())
117@topic_app.command(name="rm")
118def remove_topic_allowance(ctx: typer.Context,
119 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client to remove")],
120 action: Annotated[Action, typer.Option("--action", "-a", help="action for topic to allow")],
121 topic: Annotated[str, typer.Argument(help="list of topics")]
122 ) -> None:
123 """Remove a client from the authentication database."""
124 async def run_remove() -> None:
125 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"],
126 ctx.obj["filename"])
127 mgr = TopicManager(connect)
129 topic_auth = await mgr.get_topic_auth(client_id)
131 if not topic_auth: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true
132 logger.info(f"client '{client_id}' doesn't exist.")
133 raise typer.Exit(1)
135 if topic not in getattr(topic_auth, f"{action}_acl"):
136 logger.info(f"Error: topic '{topic}' not in the {action} allow list for {client_id}.")
137 raise typer.Exit(1)
139 try:
140 await mgr.remove_allowed_topic(client_id, topic, action)
141 except MQTTError as me:
142 logger.info(f"'Error: could not remove '{topic}' for client '{client_id}'.")
143 raise typer.Exit(1) from me
145 logger.info(f"Success: removed topic '{topic}' from {action} for '{client_id}'")
147 asyncio.run(run_remove())
150if __name__ == "__main__": 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 topic_app()