Skip to content

Commit

Permalink
Support reception of non response payloads
Browse files Browse the repository at this point in the history
If the gateway unit sends along all data instead
of only response payloads, we need to understand
the full payload sequence.
  • Loading branch information
elupus committed Nov 10, 2024
1 parent e9f74f9 commit 6dab103
Showing 1 changed file with 51 additions and 38 deletions.
89 changes: 51 additions & 38 deletions nibe/connection/nibegw.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from dataclasses import dataclass
import errno
from functools import reduce
import io
from io import BytesIO
from ipaddress import ip_address
import logging
from operator import xor
import socket
import struct
from typing import Dict, Literal, Optional, Union
from typing import Any, Dict, Literal, Optional, Union

from construct import (
Adapter,
Expand Down Expand Up @@ -183,42 +184,13 @@ def connection_made(self, transport):
def datagram_received(self, data: bytes, addr):
"""Callback when data is received."""
logger.debug(f"Received {hexlify(data).decode('utf-8')} from {addr}")

try:
msg = Response.parse(data)

if not self._remote_ip:
logger.debug("Pump discovered at %s", addr)
self._remote_ip = addr[0]

self.status = ConnectionStatus.CONNECTED

logger.debug(msg.fields.value)
cmd = msg.fields.value.cmd
if cmd == "MODBUS_DATA_MSG":
data: dict[int, bytes] = {
row.coil_address: row.value
for row in msg.fields.value.data
if row.coil_address != 0xFFFF
}
self._on_raw_coil_set(data)
elif cmd == "MODBUS_READ_RESP":
row = msg.fields.value.data
self._on_raw_coil_value(row.coil_address, row.value)
elif cmd == "MODBUS_WRITE_RESP":
with suppress(InvalidStateError, CancelledError, KeyError):
self._futures["write"].set_result(msg.fields.value.data.result)
elif cmd == "RMU_DATA_MSG":
self._on_rmu_data(msg.fields.value)
elif cmd == "PRODUCT_INFO_MSG":
data = msg.fields.value.data
product_info = ProductInfo(data["model"], data["version"])
with suppress(InvalidStateError, CancelledError, KeyError):
self._futures["product_info"].set_result(product_info)
self.notify_event_listeners(
self.PRODUCT_INFO_EVENT, product_info=product_info
)
elif not isinstance(cmd, EnumIntegerString):
logger.debug(f"Unknown command {cmd}")
with io.BytesIO(bytes(data)) as stream:
while block := Block.parse_stream(stream):
self._on_block(block, addr)
if remaining := stream.read():
logger.warning("Failed to parse: %s", remaining)

Check warning on line 193 in nibe/connection/nibegw.py

View check run for this annotation

Codecov / codecov/patch

nibe/connection/nibegw.py#L193

Added line #L193 was not covered by tests
except ConstructError as e:
logger.warning(
f"Ignoring packet from {addr} due to parse error: {hexlify(data).decode('utf-8')}: {e}"
Expand All @@ -230,6 +202,47 @@ def datagram_received(self, data: bytes, addr):
f"Unexpected exception during parsing packet data '{hexlify(data).decode('utf-8')}' from {addr}"
)

def _on_block(self, block: Container[Any], addr) -> None:
if block.start_byte == "RESPONSE":
self._on_response(block, addr)
else:
logger.debug(block)

Check warning on line 209 in nibe/connection/nibegw.py

View check run for this annotation

Codecov / codecov/patch

nibe/connection/nibegw.py#L209

Added line #L209 was not covered by tests

def _on_response(self, msg: Container[Any], addr) -> None:
if not self._remote_ip:
logger.debug("Pump discovered at %s", addr)
self._remote_ip = addr[0]

Check warning on line 214 in nibe/connection/nibegw.py

View check run for this annotation

Codecov / codecov/patch

nibe/connection/nibegw.py#L213-L214

Added lines #L213 - L214 were not covered by tests

self.status = ConnectionStatus.CONNECTED

logger.debug(msg.fields.value)
cmd = msg.fields.value.cmd
if cmd == "MODBUS_DATA_MSG":
data: dict[int, bytes] = {
row.coil_address: row.value
for row in msg.fields.value.data
if row.coil_address != 0xFFFF
}
self._on_raw_coil_set(data)
elif cmd == "MODBUS_READ_RESP":
row = msg.fields.value.data
self._on_raw_coil_value(row.coil_address, row.value)
elif cmd == "MODBUS_WRITE_RESP":
with suppress(InvalidStateError, CancelledError, KeyError):
self._futures["write"].set_result(msg.fields.value.data.result)
elif cmd == "RMU_DATA_MSG":
self._on_rmu_data(msg.fields.value)

Check warning on line 234 in nibe/connection/nibegw.py

View check run for this annotation

Codecov / codecov/patch

nibe/connection/nibegw.py#L234

Added line #L234 was not covered by tests
elif cmd == "PRODUCT_INFO_MSG":
data = msg.fields.value.data
product_info = ProductInfo(data["model"], data["version"])
with suppress(InvalidStateError, CancelledError, KeyError):
self._futures["product_info"].set_result(product_info)
self.notify_event_listeners(
self.PRODUCT_INFO_EVENT, product_info=product_info
)
elif not isinstance(cmd, EnumIntegerString):
logger.debug(f"Unknown command {cmd}")

Check warning on line 244 in nibe/connection/nibegw.py

View check run for this annotation

Codecov / codecov/patch

nibe/connection/nibegw.py#L243-L244

Added lines #L243 - L244 were not covered by tests

async def read_product_info(
self, timeout: float = READ_PRODUCT_INFO_TIMEOUT
) -> ProductInfo:
Expand Down Expand Up @@ -859,9 +872,9 @@ def _encode(self, obj, context, path):

Block = FocusedSeq(
"data",
"start" / Peek(StartCode),
"start_byte" / Peek(StartCode),
"data" / Switch(
this.start,
this.start_byte,
BlockTypes
),
)
Expand Down

0 comments on commit 6dab103

Please sign in to comment.