Skip to content

Commit

Permalink
Merge pull request #21 from RWTH-EBC/19-api-to-thethingsnetwork
Browse files Browse the repository at this point in the history
19 api to thethingsnetwork
  • Loading branch information
Jun-Jiang-92 authored Oct 28, 2024
2 parents 03dab12 + 81bff73 commit 06b8880
Show file tree
Hide file tree
Showing 7 changed files with 329 additions and 55 deletions.
10 changes: 6 additions & 4 deletions ebcmeasurements/Base/DataLogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
:param data_outputs_mapping: Mapping of multiple data outputs
:param data_rename_mapping: Mapping of rename for data sources and data outputs, None to use default names
provided by data sources
:param **kwargs:
:param kwargs:
'data_rename_mapping_explicit': bool: Default is False, if set True, all variable keys in rename mapping
will be checked, if they are available in data source
'auto_prefix_for_duplicate': bool: Default is True, if set True, all variable names will be prefixed with
Expand Down Expand Up @@ -145,6 +145,9 @@ def __init__(
else:
pass

# Count of logging
self.log_count = 0

def _check_data_rename_mapping_input(self, data_rename_mapping: dict, explicit: bool):
"""Check input dict of data rename mapping"""
def _check_data_source_name(data_source_name):
Expand Down Expand Up @@ -303,7 +306,6 @@ def run_data_logging(self, interval: int | float, duration: int | float | None):
start_time = time.time()
end_time = None if duration is None else start_time + duration
next_log_time = start_time # Init next logging time
log_count = 0 # Init count of logging

logger.info(f"Starting data logging at time {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")
if end_time is None:
Expand All @@ -324,8 +326,8 @@ def run_data_logging(self, interval: int | float, duration: int | float | None):
data = self.read_data_all_sources()

# Log count
log_count += 1 # Update log counter
print(f"TimeTrigger - Logging count(s): {log_count}") # Print log counter to console
self.log_count += 1 # Update log counter
print(f"TimeTrigger - {hex(id(self))} - Logging count(s): {self.log_count}") # Print count to console

# Log data to each output
self.log_data_all_outputs(data, timestamp)
Expand Down
1 change: 1 addition & 0 deletions ebcmeasurements/Base/UML_Class.puml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ package Base.DataLogger{
# _data_rename_mapping: dict[str, dict[str, dict[str, str]]] | None
# _all_duplicates: dict[str, dict[str, list[str]]]
# _all_variable_names_dict: dict[str, dict[str, tuple[str, ...]]]
+ log_count: int
+ __init__(\n data_sources_mapping: dict[str, DataSource.DataSourceBase],\n data_outputs_mapping: dict[str, DataOutput.DataOutputBase],\n data_rename_mapping: dict[str, dict[str, dict[str, str]]],\n **kwargs\n)
# _check_data_rename_mapping_input(data_rename_mapping: dict, explicit: bool)
# _init_data_rename_mapping(data_rename_mapping: dict): dict[str, dict[str, dict[str, str]]]
Expand Down
170 changes: 124 additions & 46 deletions ebcmeasurements/Mqtt/MqttDataSourceOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from ebcmeasurements.Base import DataOutput, DataSourceOutput, DataLogger
import paho.mqtt.client as mqtt
from typing import TypedDict
import threading
import time
import sys
import logging.config
Expand All @@ -14,15 +14,30 @@
class MqttDataSourceOutput(DataSourceOutput.DataSourceOutputBase):
class MqttDataSource(DataSourceOutput.DataSourceOutputBase.SystemDataSource):
"""MQTT implementation of nested class SystemDataSource"""
def __init__(self, system: mqtt.Client, all_topics: tuple[str, ...]):
def __init__(
self,
system: mqtt.Client,
all_topics: tuple[str, ...],
all_variable_names: tuple[str, ...] = None
):
"""
Initialization of MqttDataSource instance
:param system: MQTT client instance
:param all_topics: All topics to be subscribed
:param all_variable_names: All variable names contained in the subscribed topics. This parameter is for
cases involving payloads, as it can contain multiple values. Default is None, the variable names will be
same as each topic name
"""
logger.info("Initializing MqttDataSource ...")
super().__init__(system)
self._all_variable_names = all_topics
self._all_topics = all_topics
self._all_variable_names = all_variable_names if all_variable_names is not None else all_topics
self._data_buffer = {}

def mqtt_subscribe(self):
qos = 0
self.system.subscribe(list(zip(self._all_variable_names, [qos] * len(self._all_variable_names))))
self.system.subscribe(list(zip(self._all_topics, [qos] * len(self._all_variable_names))))

def synchronize_data_buffer(self, data: dict[str, float]):
self._data_buffer.update(data)
Expand All @@ -35,42 +50,53 @@ def read_data(self) -> dict:

class MqttDataOutput(DataSourceOutput.DataSourceOutputBase.SystemDataOutput):
"""MQTT implementation of nested class SystemDataOutput"""
def __init__(self, system: mqtt.Client, all_topics: tuple[str, ...]):
def __init__(
self,
system: mqtt.Client,
all_topics: tuple[str, ...],
all_variable_names: tuple[str, ...] = None
):
"""
Initialization of MqttDataOutput instance
:param system: MQTT client instance
:param all_topics: All topics to be published
:param all_variable_names: All variable names contained in the published topics. This parameter is for
cases involving payloads, as it can contain multiple values. Default is None, the variable names will be
same as each topic name
"""
logger.info("Initializing MqttDataOutput ...")
super().__init__(system, log_time_required=False) # No requires of log time
self._all_variable_names = all_topics
self._all_topics = all_topics
self._all_variable_names = all_variable_names if all_variable_names is not None else all_topics

def log_data(self, data: dict):
if data:
data_cleaned = self.clean_keys_with_none_values(data) # Clean none values
if data_cleaned:
if self.system.is_connected():
for topic, value in data_cleaned.items():
self.system.publish(topic, value)
else:
logger.warning("Unable to publish the data due to disconnection")
else:
logger.info("No more keys after cleaning the data, skipping logging ...")
else:
if not data:
logger.debug("No keys available in data, skipping logging ...")
return

class MqttDataLoggerConfig(TypedDict):
"""Typed dict for logger configuration of nested class MqttDataLogger """
data_outputs_mapping: dict[str, DataOutput.DataOutputBase]
data_rename_mapping: dict[str, dict[str, str]] | None
data_cleaned = self.clean_keys_with_none_values(data) # Clean none values
if not data_cleaned:
logger.info("No more keys after cleaning the data, skipping logging ...")
return

if self.system.is_connected():
for topic, value in data_cleaned.items():
self.system.publish(topic, value)
else:
logger.warning("Unable to publish the data due to disconnection")

class MqttDataLogger(DataLogger.DataLoggerBase):
"""MQTT implementation of nested class MqttDataLogger, triggerd by 'on_message'"""
class MqttDataOnMsgLogger(DataLogger.DataLoggerBase):
"""MQTT implementation of nested class MqttDataOnMsgLogger, triggerd by 'on_message'"""
def __init__(
self,
data_source,
data_source: object,
data_outputs_mapping: dict[str: DataOutput.DataOutputBase],
data_rename_mapping: dict[str: dict[str: str]] | None = None,
):
"""MQTT 'on message' triggerd data logger"""
logger.info("Initializing MqttDataLogger ...")
self.data_source_name = str(id(data_source)) # Get ID as data source name
self.log_count = 0 # Init count of logging
logger.info("Initializing MqttDataOnMsgLogger ...")
self.data_source_name = str(hex(id(data_source))) # Get ID as data source name
super().__init__(
data_sources_mapping={self.data_source_name: data_source},
data_outputs_mapping=data_outputs_mapping,
Expand All @@ -84,7 +110,7 @@ def run_data_logging(self, data):

# Log count
self.log_count += 1 # Update log counter
print(f"MQTT - Logging count(s): {self.log_count}") # Print log counter to console
print(f"MQTTOnMsgTrigger - {hex(id(self))} - Logging count(s): {self.log_count}") # Print count to console

# Log data to each output
self.log_data_all_outputs({self.data_source_name: data}, timestamp)
Expand All @@ -97,7 +123,8 @@ def __init__(
username: str = None,
password: str = None,
subscribe_topics: list[str] | None = None,
publish_topics: list[str] | None = None
publish_topics: list[str] | None = None,
**kwargs
):
"""
Initialization of MqttDataSourceOutput instance
Expand All @@ -109,6 +136,9 @@ def __init__(
:param password: See package paho.mqtt.client
:param subscribe_topics: List of topics to be subscribed from MQTT broker, None to deactivate subscribe function
:param publish_topics: List of topics to be published to MQTT broker, None to deactivate publish function
:param kwargs:
'data_source_all_variable_names': List of all variable names for data source by subscribed topics
'data_output_all_variable_names': List of all variable names for data output by published topics
"""
logger.info("Initializing MqttDataSourceOutput ...")
self.broker = broker
Expand All @@ -124,18 +154,26 @@ def __init__(

# Init DataSource
if subscribe_topics is not None:
self._data_source = self.MqttDataSource(system=self.system, all_topics=tuple(subscribe_topics))
self._data_source = self.MqttDataSource(
system=self.system,
all_topics=tuple(subscribe_topics),
all_variable_names=kwargs.get('data_source_all_variable_names', None)
)
else:
self._data_source = None

# Init DataOutput
if publish_topics is not None:
self._data_output = self.MqttDataOutput(system=self.system, all_topics=tuple(publish_topics))
self._data_output = self.MqttDataOutput(
system=self.system,
all_topics=tuple(publish_topics),
all_variable_names=kwargs.get('data_output_all_variable_names', None)
)
else:
self._data_output = None

# Init DataLogger
self._data_logger = None
# Init On-Message-DataLogger
self._on_msg_data_logger = None

# Assign callback functions
self.system.on_connect = self.on_connect
Expand All @@ -153,7 +191,7 @@ def __init__(

def __del__(self):
"""Destructor method to ensure MQTT disconnected"""
self._mqtt_stop()
self.mqtt_stop()

def _mqtt_connect(self):
"""Try to connect to MQTT broker only once"""
Expand All @@ -163,7 +201,9 @@ def _mqtt_connect(self):
try:
logger.info(f"Connecting to broker: {self.broker} ...")
self.system.connect(self.broker, self.port, self.keepalive) # Connect MQTT
self._mqtt_start() # Start network
mqtt_thread = threading.Thread(target=self._mqtt_loop_forever)
mqtt_thread.start()
logger.info(f"MQTT loop started")
except Exception as e:
logger.warning(f"Failed to connect to MQTT broker '{self.broker}', port '{self.port}': {e}")

Expand All @@ -185,7 +225,12 @@ def _mqtt_start(self):
logger.info("Starting network loop ...")
self.system.loop_start()

def _mqtt_stop(self):
def _mqtt_loop_forever(self):
"""Run the network loop forever"""
logger.info("Starting network loop forever ...")
self.system.loop_forever()

def mqtt_stop(self):
"""Stop the network loop and disconnect the broker"""
logger.info("Stopping network loop and disconnecting ...")
self.system.loop_stop()
Expand All @@ -208,8 +253,8 @@ def on_message(self, client, userdata, msg):
data = {topic: float(payload)}
if self._data_source is not None:
self._data_source.synchronize_data_buffer(data) # Synchronize data buffer of data source
if self._data_logger is not None:
self._data_logger.run_data_logging(data) # Trigger MQTT data logger
if self._on_msg_data_logger is not None:
self._on_msg_data_logger.run_data_logging(data) # Trigger MQTT data logger

def on_publish(self, client, userdata, mid):
logger.debug(f"Message published with mid: {mid}")
Expand All @@ -220,6 +265,42 @@ def on_disconnect(self, client, userdata, rc):
logger.warning("Unexpected disconnection. Attempting to reconnect...")
self._mqtt_connect_with_retry(max_retries=100, retry_period=10)

def activate_on_msg_data_logger(
self,
data_outputs_mapping: dict[str: DataOutput.DataOutputBase],
data_rename_mapping: dict[str: dict[str: str]] | None = None
):
"""
Activate the on-message data logger
The format of data_outputs_mapping is as follows:
{
'<output1_name>': <instance1 of class DataOutput>,
'<output2_name>': <instance2 of class DataOutput>,
...
}
The format of data_rename_mapping is as follows:
{
<'output1_name'>: {
<variable_name_in_source1>: <new_variable_name_in_output1>,
...
},
<'output2_name'>: {
<variable_name_in_source1>: <new_variable_name_in_output2>,
...
},
...
}
"""
# Init the data logger
self._on_msg_data_logger = self.MqttDataOnMsgLogger(
data_source=self._data_source,
data_outputs_mapping=data_outputs_mapping,
data_rename_mapping=data_rename_mapping
)
logger.info("The MQTT on-message data logger is activated")

@property
def data_source(self) -> 'MqttDataSourceOutput.MqttDataSource':
"""Instance of MqttDataSource"""
Expand All @@ -235,12 +316,9 @@ def data_output(self) -> 'MqttDataSourceOutput.MqttDataOutput':
return self._data_output

@property
def data_logger(self) -> 'MqttDataSourceOutput.MqttDataLogger':
def on_msg_data_logger(self) -> 'MqttDataSourceOutput.MqttDataOnMsgLogger':
"""MQTT data logger"""
return self._data_logger

@data_logger.setter
def data_logger(self, config: 'MqttDataSourceOutput.MqttDataLoggerConfig'):
"""Set MQTT data logger"""
self._data_logger = self.MqttDataLogger(
self.data_source, config['data_outputs_mapping'], config.get('data_rename_mapping'))
if self._on_msg_data_logger is None:
raise AttributeError(
"On message data logger unavailable, it must be activated with 'activate_on_msg_data_logger'")
return self._on_msg_data_logger
Loading

0 comments on commit 06b8880

Please sign in to comment.