Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zigpy serial protocol #177

Merged
merged 7 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
license = {text = "GPL-3.0"}
requires-python = ">=3.8"
dependencies = [
"zigpy>=0.60.0",
"zigpy>=0.70.0",
]

[tool.setuptools.packages.find]
Expand Down Expand Up @@ -43,6 +43,7 @@ ignore_errors = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.flake8]
exclude = [".venv", ".git", ".tox", "docs", "venv", "bin", "lib", "deps", "build"]
Expand Down
9 changes: 0 additions & 9 deletions tests/async_mock.py

This file was deleted.

138 changes: 37 additions & 101 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
"""Tests for API."""

import asyncio
from unittest import mock

import pytest
import serial
import zigpy.config
import zigpy.exceptions
import zigpy.types as t

from zigpy_xbee import api as xbee_api, types as xbee_t, uart
from zigpy_xbee import api as xbee_api, types as xbee_t
from zigpy_xbee.exceptions import ATCommandError, ATCommandException, InvalidCommand
from zigpy_xbee.zigbee.application import ControllerApplication

import tests.async_mock as mock

DEVICE_CONFIG = zigpy.config.SCHEMA_DEVICE(
{
zigpy.config.CONF_DEVICE_PATH: "/dev/null",
Expand All @@ -26,24 +24,49 @@
def api():
"""Sample XBee API fixture."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._uart = mock.MagicMock()
api._uart = mock.AsyncMock()
return api


async def test_connect(monkeypatch):
async def test_connect():
"""Test connect."""
api = xbee_api.XBee(DEVICE_CONFIG)
monkeypatch.setattr(uart, "connect", mock.AsyncMock())
await api.connect()
api._command = mock.AsyncMock(spec=api._command)

with mock.patch("zigpy_xbee.uart.connect"):
await api.connect()


async def test_connect_initial_timeout_success():
"""Test connect, initial command times out."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._at_command = mock.AsyncMock(side_effect=asyncio.TimeoutError)
api.init_api_mode = mock.AsyncMock(return_value=True)

with mock.patch("zigpy_xbee.uart.connect"):
await api.connect()


async def test_connect_initial_timeout_failure():
"""Test connect, initial command times out."""
api = xbee_api.XBee(DEVICE_CONFIG)
api._at_command = mock.AsyncMock(side_effect=asyncio.TimeoutError)
api.init_api_mode = mock.AsyncMock(return_value=False)

with mock.patch("zigpy_xbee.uart.connect") as mock_connect:
with pytest.raises(zigpy.exceptions.APIException):
await api.connect()

assert mock_connect.return_value.disconnect.mock_calls == [mock.call()]

def test_close(api):

async def test_disconnect(api):
"""Test connection close."""
uart = api._uart
api.close()
await api.disconnect()

assert api._uart is None
assert uart.close.call_count == 1
assert uart.disconnect.call_count == 1


def test_commands():
Expand Down Expand Up @@ -599,97 +622,10 @@ def test_handle_many_to_one_rri(api):
api._handle_many_to_one_rri(ieee, nwk, 0)


@mock.patch.object(xbee_api.XBee, "_at_command", new_callable=mock.AsyncMock)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_success(mock_connect, mock_at_cmd):
"""Test device probing."""

res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=True)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_success_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
"""Test device probing."""

res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is True
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode")
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
@pytest.mark.parametrize(
"exception",
(asyncio.TimeoutError, serial.SerialException, zigpy.exceptions.APIException),
)
async def test_probe_fail(mock_connect, mock_at_cmd, mock_api_mode, exception):
"""Test device probing fails."""

mock_api_mode.side_effect = exception
mock_api_mode.reset_mock()
mock_at_cmd.reset_mock()
mock_connect.reset_mock()
res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is False
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "init_api_mode", return_value=False)
@mock.patch.object(xbee_api.XBee, "_at_command", side_effect=asyncio.TimeoutError)
@mock.patch.object(uart, "connect", return_value=mock.MagicMock())
async def test_probe_fail_api_mode(mock_connect, mock_at_cmd, mock_api_mode):
"""Test device probing fails."""

mock_api_mode.reset_mock()
mock_at_cmd.reset_mock()
mock_connect.reset_mock()
res = await xbee_api.XBee.probe(DEVICE_CONFIG)
assert res is False
assert mock_connect.call_count == 1
assert mock_connect.await_count == 1
assert mock_connect.call_args[0][0] == DEVICE_CONFIG
assert mock_at_cmd.call_count == 1
assert mock_api_mode.call_count == 1
assert mock_connect.return_value.close.call_count == 1


@mock.patch.object(xbee_api.XBee, "connect", return_value=mock.MagicMock())
async def test_xbee_new(conn_mck):
"""Test new class method."""
api = await xbee_api.XBee.new(mock.sentinel.application, DEVICE_CONFIG)
assert isinstance(api, xbee_api.XBee)
assert conn_mck.call_count == 1
assert conn_mck.await_count == 1


@mock.patch.object(xbee_api.XBee, "connect", return_value=mock.MagicMock())
async def test_connection_lost(conn_mck):
async def test_connection_lost(api):
"""Test `connection_lost` propagataion."""
api = await xbee_api.XBee.new(mock.sentinel.application, DEVICE_CONFIG)
await api.connect()

app = api._app = mock.MagicMock()
api.set_application(mock.AsyncMock())

err = RuntimeError()
api.connection_lost(err)

app.connection_lost.assert_called_once_with(err)
api._app.connection_lost.assert_called_once_with(err)
34 changes: 13 additions & 21 deletions tests/test_application.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tests for ControllerApplication."""

import asyncio
from unittest import mock

import pytest
import zigpy.config as config
Expand All @@ -15,8 +16,6 @@
import zigpy_xbee.types as xbee_t
from zigpy_xbee.zigbee import application

import tests.async_mock as mock

APP_CONFIG = {
config.CONF_DEVICE: {
config.CONF_DEVICE_PATH: "/dev/null",
Expand Down Expand Up @@ -374,13 +373,12 @@ def init_api_mode_mock():
api_mode = api_config_succeeds
return api_config_succeeds

with mock.patch("zigpy_xbee.api.XBee") as XBee_mock:
api_mock = mock.MagicMock()
api_mock._at_command = mock.AsyncMock(side_effect=_at_command_mock)
api_mock.init_api_mode = mock.AsyncMock(side_effect=init_api_mode_mock)

XBee_mock.new = mock.AsyncMock(return_value=api_mock)
api_mock = mock.MagicMock()
api_mock._at_command = mock.AsyncMock(side_effect=_at_command_mock)
api_mock.init_api_mode = mock.AsyncMock(side_effect=init_api_mode_mock)
api_mock.connect = mock.AsyncMock()

with mock.patch("zigpy_xbee.api.XBee", return_value=api_mock):
await app.connect()

app.form_network = mock.AsyncMock()
Expand Down Expand Up @@ -418,23 +416,17 @@ async def test_start_network(app):

async def test_start_network_no_api_mode(app):
"""Test start network when not in API mode."""
await _test_start_network(app, ai_status=0x00, api_mode=False)
assert app.state.node_info.nwk == 0x0000
assert app.state.node_info.ieee == t.EUI64(range(1, 9))
assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count >= 16
with pytest.raises(asyncio.TimeoutError):
await _test_start_network(app, ai_status=0x00, api_mode=False)


async def test_start_network_api_mode_config_fails(app):
"""Test start network when not when API config fails."""
with pytest.raises(zigpy.exceptions.ControllerException):
with pytest.raises(asyncio.TimeoutError):
await _test_start_network(
app, ai_status=0x00, api_mode=False, api_config_succeeds=False
)

assert app._api.init_api_mode.call_count == 1
assert app._api._at_command.call_count == 1


async def test_permit(app):
"""Test permit joins."""
Expand Down Expand Up @@ -559,11 +551,11 @@ async def test_force_remove(app):

async def test_shutdown(app):
"""Test application shutdown."""
mack_close = mock.MagicMock()
app._api.close = mack_close
await app.shutdown()
mock_disconnect = mock.AsyncMock()
app._api.disconnect = mock_disconnect
await app.disconnect()
assert app._api is None
assert mack_close.call_count == 1
assert mock_disconnect.call_count == 1


async def test_remote_at_cmd(app, device):
Expand Down
26 changes: 9 additions & 17 deletions tests/test_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,12 @@ def test_command_mode_send(gw):
gw._transport.write.assert_called_once_with(data)


def test_close(gw):
async def test_disconnect(gw):
"""Test closing connection."""
gw.close()
assert gw._transport.close.call_count == 1
transport = gw._transport
asyncio.get_running_loop().call_soon(gw.connection_lost, None)
await gw.disconnect()
assert transport.close.call_count == 1


def test_data_received_chunk_frame(gw):
Expand Down Expand Up @@ -228,22 +230,12 @@ def test_unescape_underflow(gw):

def test_connection_lost_exc(gw):
"""Test cannection lost callback is called."""
gw._connected_future = asyncio.Future()

gw.connection_lost(ValueError())

conn_lost = gw._api.connection_lost
assert conn_lost.call_count == 1
assert isinstance(conn_lost.call_args[0][0], Exception)
assert gw._connected_future.done()
assert gw._connected_future.exception()
err = RuntimeError()
gw.connection_lost(err)
assert gw._api.connection_lost.mock_calls == [mock.call(err)]


def test_connection_closed(gw):
"""Test connection closed."""
gw._connected_future = asyncio.Future()
gw.connection_lost(None)

assert gw._api.connection_lost.call_count == 0
assert gw._connected_future.done()
assert gw._connected_future.result() is True
assert gw._api.connection_lost.mock_calls == [mock.call(None)]
Loading