-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathled_lamp.py
More file actions
303 lines (243 loc) · 9.34 KB
/
Copy pathled_lamp.py
File metadata and controls
303 lines (243 loc) · 9.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from machine import Pin, PWM
import network, time
WIFI_SSID = "YOUR_NETWORK_SSID"
WIFI_PASSWORD = "YOUR_PASSWORD"
# Connect to Wi-Fi
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('Connecting to network...')
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
while not wlan.isconnected():
pass
print("connected:", wlan.isconnected())
print("ifconfig:", wlan.ifconfig())
import mip
# Install ThingsBoard SDK only if it's missing
try:
from thingsboard_sdk.tb_device_mqtt import TBDeviceMqttClient
print("thingsboard-micropython-client-sdk package already installed.")
except ImportError:
print("Installing thingsboard-micropython-client-sdk package...")
mip.install('github:thingsboard/thingsboard-micropython-client-sdk')
from thingsboard_sdk.tb_device_mqtt import TBDeviceMqttClient
# ThingsBoard connection settings
HOST = "YOUR_HOST"
PORT = "YOUR_PORT"
ACCESS_TOKEN = "YOUR_ACCESS_TOKEN"
# GPIO mapping
SENSOR_PIN = 18
LED_WHITE_PIN = 23
# 16-bit PWM max value for duty_u16()
U16_MAX = 65535
# Fade behavior tuning (time-based)
FULL_PERIOD_MS = 10000
FADE_UPDATE_MS = 100
# Loop/telemetry intervals
STAT_PERIOD_MS = 10_000
MAIN_LOOP_SLEEP_MS = 10
RELEASE_POLL_MS = 100
# RPC method name expected from ThingsBoard dashboard
RPC_METHOD_SET_BRIGHTNESS = "setBrightnessPct"
# Initialize input (touch sensor) and PWM output (LED)
sensor = Pin(SENSOR_PIN, Pin.IN)
led_pwm = PWM(Pin(LED_WHITE_PIN), freq=1000)
led_pwm.duty_u16(0) # start OFF
# Shared state used by touch logic + RPC logic
state = {
"brightness": 0,
"direction_up": True,
"percentage_light": 0,
"is_touched": False,
"fade_elapsed_ms": 0,
}
def set_brightness_u16(x):
# Clamp brightness into valid PWM range, then apply it
if x < 0:
x = 0
if x > U16_MAX:
x = U16_MAX
led_pwm.duty_u16(x)
def calculate_value_from_time(passed_ms):
if passed_ms <= 0:
return 0
if passed_ms >= FULL_PERIOD_MS:
return U16_MAX
return (passed_ms * U16_MAX + (FULL_PERIOD_MS // 2)) // FULL_PERIOD_MS
def wait_release(sensor):
# When we hit min/max, wait until user releases the touch sensor
while sensor.value() == 1:
time.sleep_ms(RELEASE_POLL_MS)
def connect_to_broker(client):
# Connect to ThingsBoard MQTT broker with basic error handling
try:
client.connect()
print("Connected to MQTT broker")
return True
except OSError as e:
print("[TB] connect OSError:", e)
except Exception as e:
print(f"Failed to connect to MQTT broker: {e}")
return False
def send_state_telemetry(client, sensor, state):
# Prepare and send current device state to ThingsBoard
telemetry = {
"brightness": state["brightness"],
"is_touched": bool(sensor.value()),
"percentage_light": (state["brightness"] * 100 + (U16_MAX // 2)) // U16_MAX,
"is_growing": state["direction_up"],
}
try:
client.send_telemetry(telemetry)
except Exception as e:
print("[TB] send_telemetry failed:", e)
return telemetry
def parse_rpc_brightness_params(params):
# Validate and clamp RPC brightness percent (0..100)
if params is None:
raise ValueError("Params is None")
percents = int(params)
if percents < 0:
percents = 0
if percents > 100:
percents = 100
return percents
def on_server_side_rpc_request(request_id, request_body):
# Called automatically when ThingsBoard sends an RPC request
print("[RPC] id:", request_id, "body:", request_body)
try:
method = request_body.get("method")
params = request_body.get("params")
except AttributeError:
print("[RPC] bad request format (not a dict)")
return
# Ignore unknown methods (keep callback strict)
if method != RPC_METHOD_SET_BRIGHTNESS:
print("Such method is not supported:", method)
return
try:
# Convert percent to PWM duty and apply immediately
pct = parse_rpc_brightness_params(params)
brightness = int((pct * U16_MAX) / 100)
state["brightness"] = brightness
set_brightness_u16(brightness)
# Keep fade direction consistent after remote control
if brightness <= 0:
state["direction_up"] = True
elif brightness >= U16_MAX:
state["direction_up"] = False
# Defer reply sending to the main loop (safer than publishing inside callback)
reply = {"brightness": brightness, "percentage_light": pct, "is_growing": state["direction_up"]}
state["pending_rpc_reply"] = (request_id, reply)
except ValueError as e:
print("[RPC] invalid params:", e)
except Exception as e:
print("[RPC] handler error:", e)
def safe_check_msg(client):
# Non-blocking poll for incoming MQTT packets (RPC/attribute updates)
try:
client.check_for_msg()
return True
except OSError as e:
print("[TB] check_msg OSError:", e)
# Try reconnect once if the socket broke
if not connect_to_broker(client):
print("[TB] Reconnection failed")
except Exception as e:
print(f"Failed to check messages: {e}")
return False
def send_pending_rpc_reply(client, state):
# Send RPC reply later from the main loop (avoids heavy work in callback)
pending = state.get("pending_rpc_reply")
if pending is not None:
request_id, reply = pending
state["pending_rpc_reply"] = None
try:
client.send_rpc_reply(request_id, reply)
except Exception as e:
print("[RPC] publish failed in main loop:", e)
# Optionally mirror reply as telemetry for easier dashboard sync
try:
client.send_telemetry(reply)
except Exception as e:
print("[TB] send_telemetry failed in main loop:", e)
def fade(client, sensor, state, direction_up):
# Keep direction in state (used by telemetry/UI)
state["direction_up"] = direction_up
press_start_ms = time.ticks_ms()
base_elapsed_ms = state.get("fade_elapsed_ms", 0) # accumulated time from previous presses
while sensor.value() == 1:
# Total "fade time" = previous presses + current hold time
held_ms = time.ticks_diff(time.ticks_ms(), press_start_ms)
effective_ms = base_elapsed_ms + held_ms
if effective_ms >= FULL_PERIOD_MS:
effective_ms = FULL_PERIOD_MS
# progress is 0..U16_MAX
progress = calculate_value_from_time(effective_ms)
if direction_up:
state["brightness"] = progress
if effective_ms == FULL_PERIOD_MS:
# HOLD at 100% while still pressed
state["brightness"] = U16_MAX
state["direction_up"] = False
state["fade_elapsed_ms"] = FULL_PERIOD_MS # keep "completed" state
else:
state["brightness"] = U16_MAX - progress
if effective_ms == FULL_PERIOD_MS:
# HOLD at 0% while still pressed
state["brightness"] = 0
state["direction_up"] = True
state["fade_elapsed_ms"] = FULL_PERIOD_MS # keep "completed" state
set_brightness_u16(state["brightness"])
send_state_telemetry(client, sensor, state)
time.sleep_ms(FADE_UPDATE_MS)
# Released: store accumulated time so next press continues smoothly
held_ms = time.ticks_diff(time.ticks_ms(), press_start_ms)
new_elapsed = base_elapsed_ms + held_ms
if new_elapsed > FULL_PERIOD_MS:
new_elapsed = FULL_PERIOD_MS
state["fade_elapsed_ms"] = new_elapsed
# If we released exactly at an edge, flip direction and reset for the next fade cycle
if state["fade_elapsed_ms"] >= FULL_PERIOD_MS:
state["fade_elapsed_ms"] = 0
state["direction_up"] = not direction_up
def main():
prev_touch = 0
client = TBDeviceMqttClient(HOST, PORT, access_token=ACCESS_TOKEN)
client.set_server_side_rpc_request_handler(on_server_side_rpc_request)
connect_to_broker(client)
send_state_telemetry(client, sensor, state)
set_brightness_u16(0)
last_stat_ms = time.ticks_ms()
try:
while True:
# Touch edge detection (press event)
touch = sensor.value()
if touch == 1 and prev_touch == 0:
if state["direction_up"]:
fade(client, sensor, state, direction_up=True)
else:
fade(client, sensor, state, direction_up=False)
print("Released; holding brightness:", state["brightness"])
prev_touch = touch
# Periodic "status" telemetry (even if no touches happen)
now_ms = time.ticks_ms()
if time.ticks_diff(now_ms, last_stat_ms) >= STAT_PERIOD_MS:
last_stat_ms = now_ms
telemetry = send_state_telemetry(client, sensor, state)
print("Stat telemetry:", telemetry)
# Process RPC messages and send pending replies
safe_check_msg(client)
send_pending_rpc_reply(client, state)
time.sleep_ms(MAIN_LOOP_SLEEP_MS)
finally:
# Safe shutdown: turn LED off and disconnect
try:
set_brightness_u16(0)
except Exception:
pass
try:
client.disconnect()
except Exception as e:
print("Could not disconnect client:", e)
main()