Coverage for amqtt/contrib/shadows/messages.py: 97%
78 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-08-12 14:35 +0000
1from collections.abc import MutableMapping
2from dataclasses import dataclass, fields, is_dataclass
3import json
4from typing import Any
6from amqtt.contrib.shadows.states import MetaTimestamp, ShadowOperation, State, StateDocument
9def asdict_no_none(obj: Any) -> Any:
10 """Create dictionary from dataclass, but eliminate any key set to `None`."""
11 if is_dataclass(obj):
12 result = {}
13 for f in fields(obj):
14 value = getattr(obj, f.name)
15 if value is not None:
16 result[f.name] = asdict_no_none(value)
17 return result
18 if isinstance(obj, list): 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true
19 return [asdict_no_none(item) for item in obj if item is not None]
20 if isinstance(obj, dict):
21 return {
22 key: asdict_no_none(value)
23 for key, value in obj.items()
24 if value is not None
25 }
26 return obj
29def create_shadow_topic(device_id: str, shadow_name: str, message_op: "ShadowOperation") -> str:
30 """Create a shadow topic for message type."""
31 return f"$shadow/{device_id}/{shadow_name}/{message_op}"
34class ShadowMessage:
35 def to_message(self) -> bytes:
36 return json.dumps(asdict_no_none(self)).encode("utf-8")
39@dataclass
40class GetAcceptedMessage(ShadowMessage):
41 state: State[dict[str, Any]]
42 metadata: State[MetaTimestamp]
43 timestamp: int
44 version: int
46 @staticmethod
47 def topic(device_id: str, shadow_name: str) -> str:
48 return create_shadow_topic(device_id, shadow_name, ShadowOperation.GET_ACCEPT)
51@dataclass
52class GetRejectedMessage(ShadowMessage):
53 code: int
54 message: str
55 timestamp: int | None = None
57 @staticmethod
58 def topic(device_id: str, shadow_name: str) -> str:
59 return create_shadow_topic(device_id, shadow_name, ShadowOperation.GET_REJECT)
62@dataclass
63class UpdateAcceptedMessage(ShadowMessage):
64 state: State[dict[str, Any]]
65 metadata: State[MetaTimestamp]
66 timestamp: int
67 version: int
69 @staticmethod
70 def topic(device_id: str, shadow_name: str) -> str:
71 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_ACCEPT)
74@dataclass
75class UpdateRejectedMessage(ShadowMessage):
76 code: int
77 message: str
78 timestamp: int
80 @staticmethod
81 def topic(device_id: str, shadow_name: str) -> str:
82 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_REJECT)
85@dataclass
86class UpdateDeltaMessage(ShadowMessage):
87 state: MutableMapping[str, Any]
88 metadata: MutableMapping[str, Any]
89 timestamp: int
90 version: int
92 @staticmethod
93 def topic(device_id: str, shadow_name: str) -> str:
94 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_DELTA)
97class UpdateIotaMessage(UpdateDeltaMessage):
98 """Same format, corollary name."""
100 @staticmethod
101 def topic(device_id: str, shadow_name: str) -> str:
102 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_IOTA)
105@dataclass
106class UpdateDocumentMessage(ShadowMessage):
107 previous: StateDocument
108 current: StateDocument
109 timestamp: int
111 @staticmethod
112 def topic(device_id: str, shadow_name: str) -> str:
113 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_DOCUMENTS)