Coverage for amqtt/contrib/auth_db/topic_mgr_cli.py: 81%

86 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1import asyncio 

2import contextlib 

3import logging 

4from pathlib import Path 

5from typing import Annotated 

6 

7import typer 

8 

9from amqtt.contexts import Action 

10from amqtt.contrib.auth_db import DBType, db_connection_str 

11from amqtt.contrib.auth_db.managers import TopicManager, UserManager 

12from amqtt.errors import MQTTError 

13 

14logging.basicConfig(level=logging.INFO, format="%(message)s") 

15logger = logging.getLogger(__name__) 

16topic_app = typer.Typer(no_args_is_help=True) 

17 

18 

19@topic_app.callback() 

20def main( 

21 ctx: typer.Context, 

22 db_type: Annotated[DBType, typer.Option("--db", "-d", help="db type", count=False)], 

23 db_username: Annotated[str, typer.Option("--username", "-u", help="db username", show_default=False)] = "", 

24 db_port: Annotated[int, typer.Option("--port", "-p", help="database port (defaults to db type)", show_default=False)] = 0, 

25 db_host: Annotated[str, typer.Option("--host", "-h", help="database host")] = "localhost", 

26 db_filename: Annotated[str, typer.Option("--file", "-f", help="database file name (sqlite only)")] = "auth.db", 

27) -> None: 

28 """Command line interface to add / remove topic authorization. 

29 

30 Passwords are not allowed to be passed via the command line for security reasons. You will be prompted for database 

31 password (if applicable). 

32 

33 If you need to create users programmatically, see `amqtt.contrib.auth_db.managers.TopicManager` which provides 

34 the underlying functionality to this command line interface. 

35 """ 

36 if db_type == DBType.SQLITE and ctx.invoked_subcommand == "sync" and not Path(db_filename).exists(): 

37 pass 

38 elif db_type == DBType.SQLITE and not Path(db_filename).exists(): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true

39 logger.error(f"SQLite option could not find '{db_filename}'") 

40 raise typer.Exit(code=1) 

41 elif db_type != DBType.SQLITE and not db_username: 

42 logger.error("DB access requires a username be provided.") 

43 raise typer.Exit(code=1) 

44 

45 ctx.obj = {"type": db_type, "username": db_username, "host": db_host, "port": db_port, "filename": db_filename} 

46 

47 

48@topic_app.command(name="sync") 

49def db_sync(ctx: typer.Context) -> None: 

50 """Create the table and schema for username and topic lists for subscribe, publish or receive. 

51 

52 Non-destructive if run multiple times. To clear the whole table, need to drop it manually. 

53 """ 

54 async def run_sync() -> None: 

55 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"]) 

56 mgr = UserManager(connect) 

57 try: 

58 await mgr.db_sync() 

59 except MQTTError as me: 

60 logger.critical("Could not sync schema on db.") 

61 raise typer.Exit(code=1) from me 

62 asyncio.run(run_sync()) 

63 logger.info("Success: database synced.") 

64 

65 

66@topic_app.command(name="list") 

67def list_clients(ctx: typer.Context) -> None: 

68 """List all Client IDs (in alphabetical order). Will also display the hashed passwords.""" 

69 

70 async def run_list() -> None: 

71 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], ctx.obj["filename"]) 

72 mgr = TopicManager(connect) 

73 user_count = 0 

74 for user in await mgr.list_topic_auths(): 

75 user_count += 1 

76 logger.info(user) 

77 

78 if not user_count: 

79 logger.info("No client authorizations exist.") 

80 

81 asyncio.run(run_list()) 

82 

83 

84@topic_app.command(name="add") 

85def add_topic_allowance( 

86 ctx: typer.Context, 

87 topic: Annotated[str, typer.Argument(help="list of topics", show_default=False)], 

88 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client", show_default=False)], 

89 action: Annotated[Action, typer.Option("--action", "-a", help="action for topic to allow", show_default=False)] 

90 ) -> None: 

91 """Create a new user with a client id and password (prompted).""" 

92 async def run_add() -> None: 

93 

94 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], 

95 ctx.obj["filename"]) 

96 mgr = TopicManager(connect) 

97 

98 with contextlib.suppress(MQTTError): 

99 await mgr.create_topic_auth(client_id) 

100 

101 topic_auth = await mgr.get_topic_auth(client_id) 

102 if not topic_auth: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 logger.info(f"Topic auth doesn't exist for '{client_id}'") 

104 raise typer.Exit(code=1) 

105 

106 if topic in [allowed_topic.topic for allowed_topic in topic_auth.get_topic_list(action)]: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 logger.info(f"Topic '{topic}' already exists for '{action}'.") 

108 raise typer.Exit(1) 

109 

110 await mgr.add_allowed_topic(client_id, topic, action) 

111 

112 logger.info(f"Success: topic '{topic}' added to {action} for '{client_id}'") 

113 

114 asyncio.run(run_add()) 

115 

116 

117@topic_app.command(name="rm") 

118def remove_topic_allowance(ctx: typer.Context, 

119 client_id: Annotated[str, typer.Option("--client-id", "-c", help="id for the client to remove")], 

120 action: Annotated[Action, typer.Option("--action", "-a", help="action for topic to allow")], 

121 topic: Annotated[str, typer.Argument(help="list of topics")] 

122 ) -> None: 

123 """Remove a client from the authentication database.""" 

124 async def run_remove() -> None: 

125 connect = db_connection_str(ctx.obj["type"], ctx.obj["username"], ctx.obj["host"], ctx.obj["port"], 

126 ctx.obj["filename"]) 

127 mgr = TopicManager(connect) 

128 

129 topic_auth = await mgr.get_topic_auth(client_id) 

130 

131 if not topic_auth: 131 ↛ 132line 131 didn't jump to line 132 because the condition on line 131 was never true

132 logger.info(f"client '{client_id}' doesn't exist.") 

133 raise typer.Exit(1) 

134 

135 if topic not in getattr(topic_auth, f"{action}_acl"): 

136 logger.info(f"Error: topic '{topic}' not in the {action} allow list for {client_id}.") 

137 raise typer.Exit(1) 

138 

139 try: 

140 await mgr.remove_allowed_topic(client_id, topic, action) 

141 except MQTTError as me: 

142 logger.info(f"'Error: could not remove '{topic}' for client '{client_id}'.") 

143 raise typer.Exit(1) from me 

144 

145 logger.info(f"Success: removed topic '{topic}' from {action} for '{client_id}'") 

146 

147 asyncio.run(run_remove()) 

148 

149 

150if __name__ == "__main__": 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 topic_app()