Skip to content

Commit

Permalink
P2P rate limit (#1596)
Browse files Browse the repository at this point in the history
  • Loading branch information
opieters authored Nov 5, 2024
1 parent 8ef623d commit e98b284
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ nav_order: 2

# Changelog

- Added rate limit (in packets per second) option to P2PConnection.
- Fix typo in management procedure (`nm_invididual_address_write` was renamed to `nm_individual_address_write`)

# 3.3.0 Climate humidity 2024-10-20

Expand Down
2 changes: 1 addition & 1 deletion examples/example_write_individual_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def main(argv: list[str]) -> int:

async with XKNX() as xknx:
individual_address = IndividualAddress(address)
await procedures.nm_invididual_address_write(xknx, individual_address)
await procedures.nm_individual_address_write(xknx, individual_address)

return 0

Expand Down
76 changes: 76 additions & 0 deletions test/management_tests/management_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,79 @@ async def test_broadcast_message():

with pytest.raises(TypeError):
await xknx.management.broadcast(connect)


@pytest.mark.parametrize("rate_limit", [0, 1])
async def test_p2p_rate_limit(time_travel, rate_limit):
"""Test rate limit for P2P management connections."""
xknx = XKNX()
xknx.cemi_handler = AsyncMock()
ia = IndividualAddress("4.0.1")

def send_responses(index):
ack = Telegram(
source_address=ia,
destination_address=IndividualAddress(0),
direction=TelegramDirection.INCOMING,
tpci=tpci.TAck(index),
)
device_desc_resp = Telegram(
source_address=ia,
destination_address=IndividualAddress(0),
direction=TelegramDirection.INCOMING,
tpci=tpci.TDataConnected(index),
payload=apci.DeviceDescriptorResponse(),
)

xknx.management.process(ack)
xknx.management.process(device_desc_resp)

conn = await xknx.management.connect(ia, rate_limit)

# create task and request data
task = asyncio.create_task(
conn.request(
payload=apci.DeviceDescriptorRead(descriptor=0),
expected=apci.DeviceDescriptorResponse,
)
)

await asyncio.sleep(0)
send_responses(0)

await task

xknx.cemi_handler.reset_mock()

# create second task
task = asyncio.create_task(
conn.request(
payload=apci.DeviceDescriptorRead(descriptor=0),
expected=apci.DeviceDescriptorResponse,
)
)
await asyncio.sleep(0)

if rate_limit:
await time_travel(0.5 / rate_limit)

# the request is still queued
assert not xknx.cemi_handler.send_telegram.call_args_list

await time_travel(0.5 / rate_limit)

# the requests should be sent now, the behaviour should match no rate limit

assert xknx.cemi_handler.send_telegram.call_args_list == [
call(
Telegram(
destination_address=ia,
tpci=tpci.TDataConnected(1),
payload=apci.DeviceDescriptorRead(descriptor=0),
)
),
]

send_responses(1)

await task
12 changes: 6 additions & 6 deletions test/management_tests/procedures_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def test_nm_individual_address_write(time_travel):
payload=apci.IndividualAddressWrite(address=individual_address_new),
)
task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -309,7 +309,7 @@ async def test_nm_individual_address_write_two_devices_in_programming_mode(time_
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -358,7 +358,7 @@ async def test_nm_individual_address_write_no_device_programming_mode(time_trave
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -428,7 +428,7 @@ async def test_nm_individual_address_write_address_found(time_travel):
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address
)
)
Expand Down Expand Up @@ -490,7 +490,7 @@ async def test_nm_individual_address_write_programming_failed(time_travel):
payload=apci.IndividualAddressWrite(address=individual_address_new),
)
task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address_new
)
)
Expand Down Expand Up @@ -572,7 +572,7 @@ async def test_nm_individual_address_write_address_found_other_in_programming_mo
)

task = asyncio.create_task(
procedures.nm_invididual_address_write(
procedures.nm_individual_address_write(
xknx=xknx, individual_address=individual_address
)
)
Expand Down
26 changes: 22 additions & 4 deletions xknx/management/management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from collections.abc import AsyncGenerator, AsyncIterator, Callable, Generator
from contextlib import asynccontextmanager
import logging
import time
from typing import TYPE_CHECKING

from xknx.exceptions import (
Expand Down Expand Up @@ -81,11 +82,13 @@ def process(self, telegram: Telegram) -> None:
logger.debug("Unhandled management telegram: %r", telegram)
return

async def connect(self, address: IndividualAddress) -> P2PConnection:
async def connect(
self, address: IndividualAddress, rate_limit: int = 0
) -> P2PConnection:
"""Open a point-to-point connection to a KNX device."""
if address in self._connections:
raise ManagementConnectionError(f"Connection to {address} already exists.")
p2p_connection = P2PConnection(self.xknx, address)
p2p_connection = P2PConnection(self.xknx, address, rate_limit)
try:
await p2p_connection.connect()
except ManagementConnectionError as exc:
Expand Down Expand Up @@ -176,16 +179,21 @@ async def receive(
class P2PConnection:
"""Class to manage a point-to-point connection with a KNX device."""

def __init__(self, xknx: XKNX, address: IndividualAddress) -> None:
def __init__(
self, xknx: XKNX, address: IndividualAddress, rate_limit: int = 20
) -> None:
"""Initialize P2PConnection class."""
self.xknx = xknx
self.address = address
self.disconnect_hook: Callable[[], None]
self.rate_limit = rate_limit

self.sequence_number = self._sequence_number_generator()
self._expected_sequence_number = 0
self._connected = False

self._last_response_time: float = 0

self._ack_waiter: asyncio.Future[TAck | TNak] | None = None
self._response_waiter: asyncio.Future[Telegram] = (
asyncio.get_event_loop().create_future()
Expand Down Expand Up @@ -359,5 +367,15 @@ async def request(self, payload: APCI, expected: type[APCI] | None) -> Telegram:
raise ManagementConnectionRefused(
"Management connection disconnected by the peer."
)

if self.rate_limit:
# time in seconds since the last request operation
time_diff = time.time() - self._last_response_time
wait_time = 1 / self.rate_limit
if time_diff < wait_time:
await asyncio.sleep(wait_time - time_diff)

await self._send_data(payload)
return await self._receive(expected)
response = await self._receive(expected)
self._last_response_time = time.time()
return response
6 changes: 5 additions & 1 deletion xknx/management/procedures.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def nm_individual_address_read(
return addresses


async def nm_invididual_address_write(
async def nm_individual_address_write(
xknx: XKNX, individual_address: IndividualAddressableType
) -> None:
"""
Expand Down Expand Up @@ -182,6 +182,10 @@ async def nm_invididual_address_write(
await xknx.cemi_handler.send_telegram(telegram)


# for backwards compatibility
nm_invididual_address_write = nm_individual_address_write


async def nm_individual_address_serial_number_read(
xknx: XKNX,
serial: bytes,
Expand Down

0 comments on commit e98b284

Please sign in to comment.