Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aqara E1 thermostat external temperature sensor #2802

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions zhaquirks/xiaomi/aqara/thermostat_agl001.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from functools import reduce
import math
import struct
import time
from typing import Any

from zigpy.profiles import zha
Expand Down Expand Up @@ -48,6 +49,10 @@
SENSOR = 0x027E
BATTERY_PERCENTAGE = 0x040A

SENSOR_TEMP = 0x1392 # Fake address to pass external sensor temperature
SENSOR_ATTR = 0xFFF2
SENSOR_ATTR_NAME = "sensor_attr"

XIAOMI_CLUSTER_ID = 0xFCC0

DAYS_MAP = {
Expand Down Expand Up @@ -379,6 +384,8 @@
SCHEDULE_SETTINGS: ("schedule_settings", ScheduleSettings, True),
SENSOR: ("sensor", t.uint8_t, True),
BATTERY_PERCENTAGE: ("battery_percentage", t.uint8_t, True),
SENSOR_TEMP: ("sensor_temp", t.uint32_t, True),
SENSOR_ATTR: (SENSOR_ATTR_NAME, t.LVBytes, True),
}
)

Expand All @@ -393,6 +400,166 @@
)
super()._update_attribute(attrid, value)

def aqaraHeader(self, counter: int, params: bytearray, action: int) -> bytearray:
"""Create Aqara header for setting external sensor."""
header = bytes([0xAA, 0x71, len(params) + 3, 0x44, counter])
integrity = 512 - sum(header)

Check warning on line 406 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L405-L406

Added lines #L405 - L406 were not covered by tests

return header + bytes([integrity, action, 0x41, len(params)])

Check warning on line 408 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L408

Added line #L408 was not covered by tests

def _float_to_hex(self, f):
"""Convert float to hex."""
return hex(struct.unpack("<I", struct.pack("<f", f))[0])

Check warning on line 412 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L412

Added line #L412 was not covered by tests

async def write_attributes(
self, attributes: dict[str | int, Any], manufacturer: int | None = None
) -> list:
"""Write attributes to device with internal 'attributes' validation."""
sensor = bytearray.fromhex("00158d00019d1b98")
attrs = {}

for attr, value in attributes.items():
# implemented with help from https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/devices/xiaomi.js

if attr == SENSOR_TEMP:
# set external sensor temp. this function expect value to be passed multiplied by 100
temperatureBuf = bytearray.fromhex(

Check warning on line 426 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L426

Added line #L426 was not covered by tests
self._float_to_hex(round(float(value)))[2:]
)

params = sensor
params += bytes([0x00, 0x01, 0x00, 0x55])
params += temperatureBuf

Check warning on line 432 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L430-L432

Added lines #L430 - L432 were not covered by tests

attrs = {}
attrs[SENSOR_ATTR_NAME] = self.aqaraHeader(0x12, params, 0x05) + params

Check warning on line 435 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L434-L435

Added lines #L434 - L435 were not covered by tests

elif attr == SENSOR:
# set internal/external temperature sensor
device = bytearray.fromhex(

Check warning on line 439 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L439

Added line #L439 was not covered by tests
("%s" % (self.endpoint.device.ieee)).replace(":", "")
)
timestamp = bytes(reversed(t.uint32_t(int(time.time())).serialize()))

Check warning on line 442 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L442

Added line #L442 was not covered by tests

if value == 0:

Check warning on line 444 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L444

Added line #L444 was not covered by tests
# internal sensor
params1 = timestamp
params1 += bytes([0x3D, 0x05])
params1 += device
params1 += bytes(

Check warning on line 449 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L446-L449

Added lines #L446 - L449 were not covered by tests
[
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
]
)

params2 = timestamp
params2 += bytes([0x3D, 0x04])
params2 += device
params2 += bytes(

Check warning on line 469 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L466-L469

Added lines #L466 - L469 were not covered by tests
[
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
]
)

attrs1 = {}
attrs1[SENSOR_ATTR_NAME] = (

Check warning on line 487 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L486-L487

Added lines #L486 - L487 were not covered by tests
self.aqaraHeader(0x12, params1, 0x04) + params1
)
attrs[SENSOR_ATTR_NAME] = (

Check warning on line 490 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L490

Added line #L490 was not covered by tests
self.aqaraHeader(0x13, params2, 0x04) + params2
)

result = await super().write_attributes(attrs1, manufacturer)

Check warning on line 494 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L494

Added line #L494 was not covered by tests
else:
# external sensor
params1 = timestamp
params1 += bytes([0x3D, 0x04])
params1 += device
params1 += sensor
params1 += bytes([0x00, 0x01, 0x00, 0x55])
params1 += bytes(

Check warning on line 502 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L497-L502

Added lines #L497 - L502 were not covered by tests
[
0x13,
0x0A,
0x02,
0x00,
0x00,
0x64,
0x04,
0xCE,
0xC2,
0xB6,
0xC8,
]
)
params1 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D])
params1 += bytes([0x64])
params1 += bytes([0x65])

Check warning on line 519 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L517-L519

Added lines #L517 - L519 were not covered by tests

params2 = timestamp
params2 += bytes([0x3D, 0x05])
params2 += device
params2 += sensor
params2 += bytes([0x08, 0x00, 0x07, 0xFD])
params2 += bytes(

Check warning on line 526 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L521-L526

Added lines #L521 - L526 were not covered by tests
[
0x16,
0x0A,
0x02,
0x0A,
0xC9,
0xE8,
0xB1,
0xB8,
0xD4,
0xDA,
0xCF,
0xDF,
0xC0,
0xEB,
]
)
params2 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D])
params2 += bytes([0x04])
params2 += bytes([0x65])

Check warning on line 546 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L544-L546

Added lines #L544 - L546 were not covered by tests

attrs1 = {}
attrs1[SENSOR_ATTR_NAME] = (

Check warning on line 549 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L548-L549

Added lines #L548 - L549 were not covered by tests
self.aqaraHeader(0x12, params1, 0x02) + params1
)
attrs[SENSOR_ATTR_NAME] = (

Check warning on line 552 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L552

Added line #L552 was not covered by tests
self.aqaraHeader(0x13, params2, 0x02) + params2
)

result = await super().write_attributes(attrs1, manufacturer)

Check warning on line 556 in zhaquirks/xiaomi/aqara/thermostat_agl001.py

View check run for this annotation

Codecov / codecov/patch

zhaquirks/xiaomi/aqara/thermostat_agl001.py#L556

Added line #L556 was not covered by tests
else:
attrs[attr] = value

result = await super().write_attributes(attrs, manufacturer)
return result


class AGL001(XiaomiCustomDevice):
"""Aqara E1 Radiator Thermostat (AGL001) Device."""
Expand Down