Coverage for amqtt/contrib/ldap.py: 80%
86 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from dataclasses import dataclass
2import logging
3from typing import ClassVar
5import ldap
7from amqtt.broker import BrokerContext
8from amqtt.contexts import Action
9from amqtt.errors import PluginInitError
10from amqtt.plugins import TopicMatcher
11from amqtt.plugins.base import BaseAuthPlugin, BasePlugin, BaseTopicPlugin
12from amqtt.session import Session
14logger = logging.getLogger(__name__)
17@dataclass
18class LdapConfig:
19 """Configuration for the LDAP Plugins."""
21 server: str
22 """uri formatted server location. e.g `ldap://localhost:389`"""
23 base_dn: str
24 """distinguished name (dn) of the ldap server. e.g. `dc=amqtt,dc=io`"""
25 user_attribute: str
26 """attribute in ldap entry to match the username against"""
27 bind_dn: str
28 """distinguished name (dn) of known, preferably read-only, user. e.g. `cn=admin,dc=amqtt,dc=io`"""
29 bind_password: str
30 """password for known, preferably read-only, user"""
33class AuthLdapPlugin(BasePlugin[BrokerContext]):
35 def __init__(self, context: BrokerContext) -> None:
36 super().__init__(context)
38 self.conn = ldap.initialize(self.config.server)
39 self.conn.protocol_version = ldap.VERSION3 # pylint: disable=E1101
40 try:
41 self.conn.simple_bind_s(self.config.bind_dn, self.config.bind_password)
42 except ldap.INVALID_CREDENTIALS as e: # pylint: disable=E1101
43 raise PluginInitError(self.__class__) from e
46class UserAuthLdapPlugin(AuthLdapPlugin, BaseAuthPlugin):
47 """Plugin to authenticate a user with an LDAP directory server."""
49 async def authenticate(self, *, session: Session) -> bool | None:
51 # use our initial creds to see if the user exists
52 search_filter = f"({self.config.user_attribute}={session.username})"
53 result = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, ["dn"]) # pylint: disable=E1101
54 if not result: 54 ↛ 55line 54 didn't jump to line 55 because the condition on line 54 was never true
55 logger.debug(f"user not found: {session.username}")
56 return False
58 try:
59 # `search_s` responds with list of tuples: (dn, entry); first in list is our match
60 user_dn = result[0][0]
61 except IndexError:
62 return False
64 try:
65 user_conn = ldap.initialize(self.config.server)
66 user_conn.simple_bind_s(user_dn, session.password)
67 except ldap.INVALID_CREDENTIALS: # pylint: disable=E1101
68 logger.debug(f"invalid credentials for '{session.username}'")
69 return False
70 except ldap.LDAPError as e: # pylint: disable=E1101
71 logger.debug(f"LDAP error during user bind: {e}")
72 return False
74 return True
76 @dataclass
77 class Config(LdapConfig):
78 """Configuration for the User Auth LDAP Plugin."""
81class TopicAuthLdapPlugin(AuthLdapPlugin, BaseTopicPlugin):
82 """Plugin to authenticate a user with an LDAP directory server."""
84 _action_attr_map: ClassVar = {
85 Action.PUBLISH: "publish_attribute",
86 Action.SUBSCRIBE: "subscribe_attribute",
87 Action.RECEIVE: "receive_attribute"
88 }
90 def __init__(self, context: BrokerContext) -> None:
91 super().__init__(context)
93 self.topic_matcher = TopicMatcher()
95 async def topic_filtering(
96 self, *, session: Session | None = None, topic: str | None = None, action: Action | None = None
97 ) -> bool | None:
99 # if not provided needed criteria, can't properly evaluate topic filtering
100 if not session or not action or not topic: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 return None
103 search_filter = f"({self.config.user_attribute}={session.username})"
104 attrs = [
105 "cn",
106 self.config.publish_attribute,
107 self.config.subscribe_attribute,
108 self.config.receive_attribute
109 ]
110 results = self.conn.search_s(self.config.base_dn, ldap.SCOPE_SUBTREE, search_filter, attrs) # pylint: disable=E1101
112 if not results: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 logger.debug(f"user not found: {session.username}")
114 return False
116 if len(results) > 1: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 found_users = [dn for dn, _ in results]
118 logger.debug(f"multiple users found: {', '.join(found_users)}")
119 return False
121 dn, entry = results[0]
123 ldap_attribute = getattr(self.config, self._action_attr_map[action])
124 topic_filters = [t.decode("utf-8") for t in entry.get(ldap_attribute, [])]
125 logger.debug(f"DN: {dn} - {ldap_attribute}={topic_filters}")
127 return self.topic_matcher.are_topics_allowed(topic, topic_filters)
129 @dataclass
130 class Config(LdapConfig):
131 """Configuration for the LDAPAuthPlugin."""
133 publish_attribute: str
134 """LDAP attribute which contains a list of permissible publish topics."""
135 subscribe_attribute: str
136 """LDAP attribute which contains a list of permissible subscribe topics."""
137 receive_attribute: str
138 """LDAP attribute which contains a list of permissible receive topics."""