Coverage for amqtt/contrib/auth_db/user_mgr_cli.py: 86%

98 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2import logging 

3from pathlib import Path 

4from typing import Annotated 

5 

6import click 

7import passlib 

8import typer 

9 

10from amqtt.contrib.auth_db import DBType, db_connection_str 

11from amqtt.contrib.auth_db.managers import UserManager 

12from amqtt.errors import MQTTError 

13 

14logging.basicConfig(level=logging.INFO, format="%(message)s") 

15logger = logging.getLogger(__name__) 

16user_app = typer.Typer(no_args_is_help=True) 

17 

18 

19@user_app.callback() 

20def main( 

21 ctx: typer.Context, 

22 db_type: Annotated[DBType, typer.Option(..., "--db", "-d", help="db type", show_default=False)], 

23 db_username: Annotated[str, typer.Option("--username", "-u", help="db username", show_default=False)] = "", 

24 db_port: Annotated[int, typer.Option("--port", "-p", help="database port (defaults to db type)", show_default=False)] = 0, 

25 db_host: Annotated[str, typer.Option("--host", "-h", help="database host")] = "localhost", 

26 db_filename: Annotated[str, typer.Option("--file", "-f", help="database file name (sqlite only)")] = "auth.db", 

27) -> None: 

28 """Command line interface to list, create, remove and add clients. 

29 

30 Passwords are not allowed to be passed via the command line for security reasons. You will be prompted for database 

31 password (if applicable) and the client id's password. 

32 

33 If you need to create users programmatically, see `amqtt.contrib.auth_db.managers.UserManager` which provides 

34 the underlying functionality to this command line interface. 

35 """ 

36 if db_type == DBType.SQLITE and ctx.invoked_subcommand == "sync" and not Path(db_filename).exists(): 

37 pass 

38 elif db_type == DBType.SQLITE and not Path(db_filename).exists(): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 logger.error(f"SQLite option could not find '{db_filename}'") 

40 raise typer.Exit(code=1) 

41 elif db_type != DBType.SQLITE and not db_username: 

42 logger.error("DB access requires a username be provided.") 

43 raise typer.Exit(code=1) 

44 

45 ctx.obj = {"type": db_type, "username": db_username, "host": db_host, "port": db_port, "filename": db_filename} 

46 

47 

48@user_app.command(name="sync") 

49def db_sync(ctx: typer.Context) -> None: 

50 """Create the table and schema for username and hashed password. 

51 

52 Non-destructive if run multiple times. To clear the whole table, need to drop it manually. 

53 """ 

54 async def run_sync() -> None: 

55 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"]) 

56 mgr = UserManager(connect) 

57 try: 

58 await mgr.db_sync() 

59 except MQTTError as me: 

60 logger.critical("Could not sync schema on db.") 

61 raise typer.Exit(code=1) from me 

62 

63 asyncio.run(run_sync()) 

64 logger.info("Success: database synced.") 

65 

66 

67@user_app.command(name="list") 

68def list_user_auths(ctx: typer.Context) -> None: 

69 """List all Client IDs (in alphabetical order). Will also display the hashed passwords.""" 

70 

71 async def run_list() -> None: 

72 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"]) 

73 mgr = UserManager(connect) 

74 user_count = 0 

75 for user in await mgr.list_user_auths(): 

76 user_count += 1 

77 logger.info(user) 

78 

79 if not user_count: 

80 logger.info("No client authentications exist.") 

81 

82 asyncio.run(run_list()) 

83 

84 

85@user_app.command(name="add") 

86def create_user_auth( 

87 ctx: typer.Context, 

88 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the new client")], 

89 ) -> None: 

90 """Create a new user with a client id and password (prompted).""" 

91 async def run_create() -> None: 

92 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], 

93 ctx.obj["filename"]) 

94 mgr = UserManager(connect) 

95 client_password = click.prompt("Enter the client's password", hide_input=True) 

96 if not client_password.strip(): 

97 logger.info("Error: client password cannot be empty.") 

98 raise typer.Exit(1) 

99 try: 

100 user = await mgr.create_user_auth(client_id, client_password.strip()) 

101 except passlib.exc.MissingBackendError as mbe: 

102 logger.info(f"Please install backend: {mbe}") 

103 raise typer.Exit(code=1) from mbe 

104 

105 if not user: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 logger.info(f"Error: could not create user: {client_id}") 

107 raise typer.Exit(code=1) 

108 

109 logger.info(f"Success: created {user}") 

110 

111 asyncio.run(run_create()) 

112 

113 

114@user_app.command(name="rm") 

115def remove_user_auth(ctx: typer.Context, 

116 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client to remove")]) -> None: 

117 """Remove a client from the authentication database.""" 

118 async def run_remove() -> None: 

119 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], 

120 ctx.obj["filename"]) 

121 mgr = UserManager(connect) 

122 user = await mgr.get_user_auth(client_id) 

123 if not user: 

124 logger.info(f"Error: client '{client_id}' does not exist.") 

125 raise typer.Exit(1) 

126 

127 if not click.confirm(f"Please confirm the removal of '{client_id}'?"): 

128 raise typer.Exit(0) 

129 

130 user = await mgr.delete_user_auth(client_id) 

131 if not user: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 logger.info(f"Error: client '{client_id}' does not exist.") 

133 raise typer.Exit(1) 

134 

135 logger.info(f"Success: '{user.username}' was removed.") 

136 

137 asyncio.run(run_remove()) 

138 

139 

140@user_app.command(name="pwd") 

141def change_password( 

142 ctx: typer.Context, 

143 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the new client")], 

144 ) -> None: 

145 """Update a user's password (prompted).""" 

146 async def run_password() -> None: 

147 client_password = click.prompt("Enter the client's new password", hide_input=True) 

148 if not client_password.strip(): 

149 logger.error("Error: client password cannot be empty.") 

150 raise typer.Exit(1) 

151 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], 

152 ctx.obj["filename"]) 

153 mgr = UserManager(connect) 

154 await mgr.update_user_auth_password(client_id, client_password.strip()) 

155 logger.info(f"Success: client '{client_id}' password updated.") 

156 

157 asyncio.run(run_password()) 

158 

159 

160if __name__ == "__main__": 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 user_app()