1
1
"""Aqara E1 Radiator Thermostat Quirk."""
2
-
3
2
from __future__ import annotations
4
3
5
4
from functools import reduce
6
5
import math
7
6
import struct
7
+ import time
8
8
from typing import Any
9
9
10
10
from zigpy .profiles import zha
49
49
SENSOR = 0x027E
50
50
BATTERY_PERCENTAGE = 0x040A
51
51
52
+ SENSOR_TEMP = 0x1392 # Fake address to pass external sensor temperature
53
+ SENSOR_ATTR = 0xFFF2
54
+ SENSOR_ATTR_NAME = "sensor_attr"
55
+
52
56
XIAOMI_CLUSTER_ID = 0xFCC0
53
57
54
58
DAYS_MAP = {
@@ -140,12 +144,11 @@ async def write_attributes(
140
144
141
145
142
146
class ScheduleEvent :
143
- """Schedule event object. """
147
+ """Schedule event object"""
144
148
145
149
_is_next_day = False
146
150
147
151
def __init__ (self , value , is_next_day = False ):
148
- """Create ScheduleEvent object from bytes or string."""
149
152
if isinstance (value , bytes ):
150
153
self ._verify_buffer_len (value )
151
154
self ._time = self ._read_time_from_buf (value )
@@ -178,8 +181,8 @@ def _read_time_from_buf(buf):
178
181
return time
179
182
180
183
@staticmethod
181
- def _parse_time (string ):
182
- parts = string .split (":" )
184
+ def _parse_time (str ):
185
+ parts = str .split (":" )
183
186
if len (parts ) != 2 :
184
187
raise ValueError ("Time must contain ':' separator" )
185
188
@@ -193,8 +196,8 @@ def _read_temp_from_buf(buf):
193
196
return struct .unpack_from (">H" , buf , offset = 4 )[0 ] / 100
194
197
195
198
@staticmethod
196
- def _parse_temp (string ):
197
- return float (string )
199
+ def _parse_temp (str ):
200
+ return float (str )
198
201
199
202
@staticmethod
200
203
def _validate_time (time ):
@@ -222,34 +225,28 @@ def _write_temp_to_buf(self, buf):
222
225
struct .pack_into (">H" , buf , 4 , int (self ._temp * 100 ))
223
226
224
227
def is_next_day (self ):
225
- """Return if event is on the next day."""
226
228
return self ._is_next_day
227
229
228
230
def set_next_day (self , is_next_day ):
229
- """Set if event is on the next day."""
230
231
self ._is_next_day = is_next_day
231
232
232
233
def get_time (self ):
233
- """Return event time."""
234
234
return self ._time
235
235
236
236
def __str__ (self ):
237
- """Return event as string."""
238
237
return f"{ math .floor (self ._time / 60 )} :{ f'{ self ._time % 60 :0>2} ' } ,{ f'{ self ._temp :.1f} ' } "
239
238
240
239
def serialize (self ):
241
- """Serialize event to bytes."""
242
240
result = bytearray (6 )
243
241
self ._write_time_to_buf (result )
244
242
self ._write_temp_to_buf (result )
245
243
return result
246
244
247
245
248
246
class ScheduleSettings (t .LVBytes ):
249
- """Schedule settings object. """
247
+ """Schedule settings object"""
250
248
251
249
def __new__ (cls , value ):
252
- """Create ScheduleSettings object from bytes or string."""
253
250
day_selection = None
254
251
events = [None ] * 4
255
252
if isinstance (value , bytes ):
@@ -304,7 +301,7 @@ def _verify_day_selection_in_str(days):
304
301
if len (days ) != len (set (days )):
305
302
raise ValueError ("Duplicate day names present" )
306
303
for d in days :
307
- if d not in DAYS_MAP :
304
+ if d not in DAYS_MAP . keys () :
308
305
raise ValueError (
309
306
f"String: { d } is not a valid day name, valid names: mon, tue, wed, thu, fri, sat, sun"
310
307
)
@@ -316,8 +313,8 @@ def _read_day_selection(value):
316
313
byte = struct .unpack_from ("c" , value , offset = 1 )[0 ][0 ]
317
314
if byte & 0x01 :
318
315
raise ValueError ("Incorrect day selected" )
319
- for i , v in DAYS_MAP . items () :
320
- if byte & v :
316
+ for i in DAYS_MAP :
317
+ if byte & DAYS_MAP [ i ] :
321
318
day_selection .append (i )
322
319
ScheduleSettings ._verify_day_selection_in_str (day_selection )
323
320
elif isinstance (value , str ):
@@ -358,7 +355,6 @@ def _get_day_selection_byte(day_selection):
358
355
return byte
359
356
360
357
def __str__ (self ):
361
- """Return ScheduleSettings as string."""
362
358
day_selection = ScheduleSettings ._read_day_selection (self )
363
359
events = [None ] * 4
364
360
for i in range (4 ):
@@ -388,6 +384,8 @@ class AqaraThermostatSpecificCluster(XiaomiAqaraE1Cluster):
388
384
SCHEDULE_SETTINGS : ("schedule_settings" , ScheduleSettings , True ),
389
385
SENSOR : ("sensor" , t .uint8_t , True ),
390
386
BATTERY_PERCENTAGE : ("battery_percentage" , t .uint8_t , True ),
387
+ SENSOR_TEMP : ("sensor_temp" , t .uint32_t , True ),
388
+ SENSOR_ATTR : (SENSOR_ATTR_NAME , t .LVBytes , True ),
391
389
}
392
390
)
393
391
@@ -402,6 +400,166 @@ def _update_attribute(self, attrid, value):
402
400
)
403
401
super ()._update_attribute (attrid , value )
404
402
403
+ def aqaraHeader (self , counter : int , params : bytearray , action : int ) -> bytearray :
404
+ """Create Aqara header for setting external sensor."""
405
+ header = bytes ([0xAA , 0x71 , len (params ) + 3 , 0x44 , counter ])
406
+ integrity = 512 - sum (header )
407
+
408
+ return header + bytes ([integrity , action , 0x41 , len (params )])
409
+
410
+ def _float_to_hex (self , f ):
411
+ """Convert float to hex."""
412
+ return hex (struct .unpack ("<I" , struct .pack ("<f" , f ))[0 ])
413
+
414
+ async def write_attributes (
415
+ self , attributes : dict [str | int , Any ], manufacturer : int | None = None
416
+ ) -> list :
417
+ """Write attributes to device with internal 'attributes' validation."""
418
+ sensor = bytearray .fromhex ("00158d00019d1b98" )
419
+ attrs = {}
420
+
421
+ for attr , value in attributes .items ():
422
+ # implemented with help from https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/devices/xiaomi.js
423
+
424
+ if attr == SENSOR_TEMP :
425
+ # set external sensor temp. this function expect value to be passed multiplied by 100
426
+ temperatureBuf = bytearray .fromhex (
427
+ self ._float_to_hex (round (float (value )))[2 :]
428
+ )
429
+
430
+ params = sensor
431
+ params += bytes ([0x00 , 0x01 , 0x00 , 0x55 ])
432
+ params += temperatureBuf
433
+
434
+ attrs = {}
435
+ attrs [SENSOR_ATTR_NAME ] = self .aqaraHeader (0x12 , params , 0x05 ) + params
436
+
437
+ elif attr == SENSOR :
438
+ # set internal/external temperature sensor
439
+ device = bytearray .fromhex (
440
+ ("%s" % (self .endpoint .device .ieee )).replace (":" , "" )
441
+ )
442
+ timestamp = bytes (reversed (t .uint32_t (int (time .time ())).serialize ()))
443
+
444
+ if value == 0 :
445
+ # internal sensor
446
+ params1 = timestamp
447
+ params1 += bytes ([0x3D , 0x05 ])
448
+ params1 += device
449
+ params1 += bytes (
450
+ [
451
+ 0x00 ,
452
+ 0x00 ,
453
+ 0x00 ,
454
+ 0x00 ,
455
+ 0x00 ,
456
+ 0x00 ,
457
+ 0x00 ,
458
+ 0x00 ,
459
+ 0x00 ,
460
+ 0x00 ,
461
+ 0x00 ,
462
+ 0x00 ,
463
+ ]
464
+ )
465
+
466
+ params2 = timestamp
467
+ params2 += bytes ([0x3D , 0x04 ])
468
+ params2 += device
469
+ params2 += bytes (
470
+ [
471
+ 0x00 ,
472
+ 0x00 ,
473
+ 0x00 ,
474
+ 0x00 ,
475
+ 0x00 ,
476
+ 0x00 ,
477
+ 0x00 ,
478
+ 0x00 ,
479
+ 0x00 ,
480
+ 0x00 ,
481
+ 0x00 ,
482
+ 0x00 ,
483
+ ]
484
+ )
485
+
486
+ attrs1 = {}
487
+ attrs1 [SENSOR_ATTR_NAME ] = (
488
+ self .aqaraHeader (0x12 , params1 , 0x04 ) + params1
489
+ )
490
+ attrs [SENSOR_ATTR_NAME ] = (
491
+ self .aqaraHeader (0x13 , params2 , 0x04 ) + params2
492
+ )
493
+
494
+ result = await super ().write_attributes (attrs1 , manufacturer )
495
+ else :
496
+ # external sensor
497
+ params1 = timestamp
498
+ params1 += bytes ([0x3D , 0x04 ])
499
+ params1 += device
500
+ params1 += sensor
501
+ params1 += bytes ([0x00 , 0x01 , 0x00 , 0x55 ])
502
+ params1 += bytes (
503
+ [
504
+ 0x13 ,
505
+ 0x0A ,
506
+ 0x02 ,
507
+ 0x00 ,
508
+ 0x00 ,
509
+ 0x64 ,
510
+ 0x04 ,
511
+ 0xCE ,
512
+ 0xC2 ,
513
+ 0xB6 ,
514
+ 0xC8 ,
515
+ ]
516
+ )
517
+ params1 += bytes ([0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x01 , 0x3D ])
518
+ params1 += bytes ([0x64 ])
519
+ params1 += bytes ([0x65 ])
520
+
521
+ params2 = timestamp
522
+ params2 += bytes ([0x3D , 0x05 ])
523
+ params2 += device
524
+ params2 += sensor
525
+ params2 += bytes ([0x08 , 0x00 , 0x07 , 0xFD ])
526
+ params2 += bytes (
527
+ [
528
+ 0x16 ,
529
+ 0x0A ,
530
+ 0x02 ,
531
+ 0x0A ,
532
+ 0xC9 ,
533
+ 0xE8 ,
534
+ 0xB1 ,
535
+ 0xB8 ,
536
+ 0xD4 ,
537
+ 0xDA ,
538
+ 0xCF ,
539
+ 0xDF ,
540
+ 0xC0 ,
541
+ 0xEB ,
542
+ ]
543
+ )
544
+ params2 += bytes ([0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x01 , 0x3D ])
545
+ params2 += bytes ([0x04 ])
546
+ params2 += bytes ([0x65 ])
547
+
548
+ attrs1 = {}
549
+ attrs1 [SENSOR_ATTR_NAME ] = (
550
+ self .aqaraHeader (0x12 , params1 , 0x02 ) + params1
551
+ )
552
+ attrs [SENSOR_ATTR_NAME ] = (
553
+ self .aqaraHeader (0x13 , params2 , 0x02 ) + params2
554
+ )
555
+
556
+ result = await super ().write_attributes (attrs1 , manufacturer )
557
+ else :
558
+ attrs [attr ] = value
559
+
560
+ result = await super ().write_attributes (attrs , manufacturer )
561
+ return result
562
+
405
563
406
564
class AGL001 (XiaomiCustomDevice ):
407
565
"""Aqara E1 Radiator Thermostat (AGL001) Device."""
0 commit comments