Coverage for amqtt/contrib/shadows/messages.py: 97%

78 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-08-12 14:35 +0000

1from collections.abc import MutableMapping 

2from dataclasses import dataclass, fields, is_dataclass 

3import json 

4from typing import Any 

5 

6from amqtt.contrib.shadows.states import MetaTimestamp, ShadowOperation, State, StateDocument 

7 

8 

9def asdict_no_none(obj: Any) -> Any: 

10 """Create dictionary from dataclass, but eliminate any key set to `None`.""" 

11 if is_dataclass(obj): 

12 result = {} 

13 for f in fields(obj): 

14 value = getattr(obj, f.name) 

15 if value is not None: 

16 result[f.name] = asdict_no_none(value) 

17 return result 

18 if isinstance(obj, list): 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true

19 return [asdict_no_none(item) for item in obj if item is not None] 

20 if isinstance(obj, dict): 

21 return { 

22 key: asdict_no_none(value) 

23 for key, value in obj.items() 

24 if value is not None 

25 } 

26 return obj 

27 

28 

29def create_shadow_topic(device_id: str, shadow_name: str, message_op: "ShadowOperation") -> str: 

30 """Create a shadow topic for message type.""" 

31 return f"$shadow/{device_id}/{shadow_name}/{message_op}" 

32 

33 

34class ShadowMessage: 

35 def to_message(self) -> bytes: 

36 return json.dumps(asdict_no_none(self)).encode("utf-8") 

37 

38 

39@dataclass 

40class GetAcceptedMessage(ShadowMessage): 

41 state: State[dict[str, Any]] 

42 metadata: State[MetaTimestamp] 

43 timestamp: int 

44 version: int 

45 

46 @staticmethod 

47 def topic(device_id: str, shadow_name: str) -> str: 

48 return create_shadow_topic(device_id, shadow_name, ShadowOperation.GET_ACCEPT) 

49 

50 

51@dataclass 

52class GetRejectedMessage(ShadowMessage): 

53 code: int 

54 message: str 

55 timestamp: int | None = None 

56 

57 @staticmethod 

58 def topic(device_id: str, shadow_name: str) -> str: 

59 return create_shadow_topic(device_id, shadow_name, ShadowOperation.GET_REJECT) 

60 

61 

62@dataclass 

63class UpdateAcceptedMessage(ShadowMessage): 

64 state: State[dict[str, Any]] 

65 metadata: State[MetaTimestamp] 

66 timestamp: int 

67 version: int 

68 

69 @staticmethod 

70 def topic(device_id: str, shadow_name: str) -> str: 

71 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_ACCEPT) 

72 

73 

74@dataclass 

75class UpdateRejectedMessage(ShadowMessage): 

76 code: int 

77 message: str 

78 timestamp: int 

79 

80 @staticmethod 

81 def topic(device_id: str, shadow_name: str) -> str: 

82 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_REJECT) 

83 

84 

85@dataclass 

86class UpdateDeltaMessage(ShadowMessage): 

87 state: MutableMapping[str, Any] 

88 metadata: MutableMapping[str, Any] 

89 timestamp: int 

90 version: int 

91 

92 @staticmethod 

93 def topic(device_id: str, shadow_name: str) -> str: 

94 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_DELTA) 

95 

96 

97class UpdateIotaMessage(UpdateDeltaMessage): 

98 """Same format, corollary name.""" 

99 

100 @staticmethod 

101 def topic(device_id: str, shadow_name: str) -> str: 

102 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_IOTA) 

103 

104 

105@dataclass 

106class UpdateDocumentMessage(ShadowMessage): 

107 previous: StateDocument 

108 current: StateDocument 

109 timestamp: int 

110 

111 @staticmethod 

112 def topic(device_id: str, shadow_name: str) -> str: 

113 return create_shadow_topic(device_id, shadow_name, ShadowOperation.UPDATE_DOCUMENTS)