Coverage for amqtt/contrib/ldap.py: 80%

86 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from dataclasses import dataclass 

2import logging 

3from typing import ClassVar 

4 

5import ldap 

6 

7from amqtt.broker import BrokerContext 

8from amqtt.contexts import Action 

9from amqtt.errors import PluginInitError 

10from amqtt.plugins import TopicMatcher 

11from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin 

12from amqtt.session import Session 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17@dataclass 

18class LdapConfig: 

19 """Configuration for the LDAP Plugins.""" 

20 

21 server: str 

22 """uri formatted server location. e.g `ldap://localhost:389`""" 

23 base_dn: str 

24 """distinguished name (dn) of the ldap server. e.g. `dc=amqtt,dc=io`""" 

25 user_attribute: str 

26 """attribute in ldap entry to match the username against""" 

27 bind_dn: str 

28 """distinguished name (dn) of known, preferably read-only, user. e.g. `cn=admin,dc=amqtt,dc=io`""" 

29 bind_password: str 

30 """password for known, preferably read-only, user""" 

31 

32 

33class AuthLdapPlugin(BasePlugin[BrokerContext]): 

34 

35 def __init__(self, context: BrokerContext) -> None: 

36 super().__init__(context) 

37 

38 self.conn = ldap.initialize(self.config.server) 

39 self.conn.protocol_version = ldap.VERSION3 # pylint: disable=E1101 

40 try: 

41 self.conn.simple_bind_s(self.config.bind_dn, self.config.bind_password) 

42 except ldap.INVALID_CREDENTIALS as e: # pylint: disable=E1101 

43 raise PluginInitError(self.__class__) from e 

44 

45 

46class UserAuthLdapPlugin(AuthLdapPlugin, BaseAuthPlugin): 

47 """Plugin to authenticate a user with an LDAP directory server.""" 

48 

49 async def authenticate(self, *, session: Session) -> bool | None: 

50 

51 # use our initial creds to see if the user exists 

52 search_filter = f"({self.config.user_attribute}={session.username})" 

53 result = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, ["dn"]) # pylint: disable=E1101 

54 if not result: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true

55 logger.debug(f"user not found: {session.username}") 

56 return False 

57 

58 try: 

59 # `search_s` responds with list of tuples: (dn, entry); first in list is our match 

60 user_dn = result[0][0] 

61 except IndexError: 

62 return False 

63 

64 try: 

65 user_conn = ldap.initialize(self.config.server) 

66 user_conn.simple_bind_s(user_dn, session.password) 

67 except ldap.INVALID_CREDENTIALS: # pylint: disable=E1101 

68 logger.debug(f"invalid credentials for '{session.username}'") 

69 return False 

70 except ldap.LDAPError as e: # pylint: disable=E1101 

71 logger.debug(f"LDAP error during user bind: {e}") 

72 return False 

73 

74 return True 

75 

76 @dataclass 

77 class Config(LdapConfig): 

78 """Configuration for the User Auth LDAP Plugin.""" 

79 

80 

81class TopicAuthLdapPlugin(AuthLdapPlugin, BaseTopicPlugin): 

82 """Plugin to authenticate a user with an LDAP directory server.""" 

83 

84 _action_attr_map: ClassVar = { 

85 Action.PUBLISH: "publish_attribute", 

86 Action.SUBSCRIBE: "subscribe_attribute", 

87 Action.RECEIVE: "receive_attribute" 

88 } 

89 

90 def __init__(self, context: BrokerContext) -> None: 

91 super().__init__(context) 

92 

93 self.topic_matcher = TopicMatcher() 

94 

95 async def topic_filtering( 

96 self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None 

97 ) -> bool | None: 

98 

99 # if not provided needed criteria, can't properly evaluate topic filtering 

100 if not session or not action or not topic: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true

101 return None 

102 

103 search_filter = f"({self.config.user_attribute}={session.username})" 

104 attrs = [ 

105 "cn", 

106 self.config.publish_attribute, 

107 self.config.subscribe_attribute, 

108 self.config.receive_attribute 

109 ] 

110 results = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, attrs) # pylint: disable=E1101 

111 

112 if not results: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 logger.debug(f"user not found: {session.username}") 

114 return False 

115 

116 if len(results) > 1: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 found_users = [dn for dn, _ in results] 

118 logger.debug(f"multiple users found: {', '.join(found_users)}") 

119 return False 

120 

121 dn, entry = results[0] 

122 

123 ldap_attribute = getattr(self.config, self._action_attr_map[action]) 

124 topic_filters = [t.decode("utf-8") for t in entry.get(ldap_attribute, [])] 

125 logger.debug(f"DN: {dn} - {ldap_attribute}={topic_filters}") 

126 

127 return self.topic_matcher.are_topics_allowed(topic, topic_filters) 

128 

129 @dataclass 

130 class Config(LdapConfig): 

131 """Configuration for the LDAPAuthPlugin.""" 

132 

133 publish_attribute: str 

134 """LDAP attribute which contains a list of permissible publish topics.""" 

135 subscribe_attribute: str 

136 """LDAP attribute which contains a list of permissible subscribe topics.""" 

137 receive_attribute: str 

138 """LDAP attribute which contains a list of permissible receive topics."""