Coverage for amqtt/plugins/__init__.py: 94%
22 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1"""INIT."""
2import re
3from typing import Any, Optional
6class TopicMatcher:
8 _instance: Optional["TopicMatcher"] = None
10 def __init__(self) -> None:
11 if not hasattr(self, "_topic_filter_matchers"):
12 self._topic_filter_matchers: dict[str, re.Pattern[str]] = {}
14 def __new__(cls, *args: list[Any], **kwargs: dict[str, Any]) -> "TopicMatcher":
15 if cls._instance is None:
16 cls._instance = super().__new__(cls, *args, **kwargs)
17 return cls._instance
19 def is_topic_allowed(self, topic: str, a_filter: str) -> bool:
20 if topic.startswith("$") and (a_filter.startswith(("+", "#"))): 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true
21 return False
23 if "#" not in a_filter and "+" not in a_filter:
24 # if filter doesn't contain wildcard, return exact match
25 return a_filter == topic
27 # else use regex (re.compile is an expensive operation, store the matcher for future use)
28 if a_filter not in self._topic_filter_matchers:
29 self._topic_filter_matchers[a_filter] = re.compile(re.escape(a_filter)
30 .replace("\\#", "?.*")
31 .replace("\\+", "[^/]*")
32 .lstrip("?"))
33 match_pattern = self._topic_filter_matchers[a_filter]
34 return bool(match_pattern.fullmatch(topic))
36 def are_topics_allowed(self, topic: str, many_filters: list[str]) -> bool:
38 return any(self.is_topic_allowed(topic, a_filter) for a_filter in many_filters)