Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: feat: support metrics log stream #28

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/otaclient_iot_logging_server/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@

from __future__ import annotations

from enum import Enum
from queue import Queue
from typing import Literal, TypedDict

from typing_extensions import NotRequired, TypeAlias

LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]"
LogsQueue: TypeAlias = "Queue[tuple[LogGroupType, str, LogMessage]]"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the data format of the queue.



class LogGroupType(Enum):
LOG = 0
METRICS = 1


class LogMessage(TypedDict):
Expand Down
8 changes: 4 additions & 4 deletions src/otaclient_iot_logging_server/_log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import contextlib
import logging
import time
from queue import Queue

from otaclient_iot_logging_server import package_name as root_package_name
from otaclient_iot_logging_server._common import LogMessage
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
from otaclient_iot_logging_server.configs import server_cfg


Expand All @@ -30,7 +29,7 @@ class _LogTeeHandler(logging.Handler):

def __init__(
self,
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
logstream_suffix: str,
) -> None:
super().__init__()
Expand All @@ -41,6 +40,7 @@ def emit(self, record: logging.LogRecord) -> None:
with contextlib.suppress(Exception):
self._queue.put_nowait(
(
LogGroupType.LOG, # always put into log group
self._logstream_suffix,
LogMessage(
timestamp=int(time.time()) * 1000, # milliseconds
Expand All @@ -51,7 +51,7 @@ def emit(self, record: logging.LogRecord) -> None:


def config_logging(
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
*,
log_format: str,
level: str,
Expand Down
90 changes: 56 additions & 34 deletions src/otaclient_iot_logging_server/aws_iot_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import awscrt.exceptions
from typing_extensions import NoReturn

from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue
from otaclient_iot_logging_server._common import (
LogEvent,
LogGroupType,
LogMessage,
LogsQueue,
)
from otaclient_iot_logging_server._utils import retry
from otaclient_iot_logging_server.boto3_session import get_session
from otaclient_iot_logging_server.configs import server_cfg
Expand Down Expand Up @@ -68,40 +73,45 @@ def __init__(

self._session_config = session_config
self._log_group_name = session_config.aws_cloudwatch_log_group
self._metrics_group_name = session_config.aws_cloudwatch_metrics_log_group
self._interval = interval
self._queue: LogsQueue = queue
# NOTE: add this limitation to ensure all of the log_streams in a merge
# will definitely have entries less than MAX_LOGS_PER_PUT
self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT)

@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_group(self):
def _create_log_groups(self):
# TODO: (20240214) should we let the edge side iot_logging_server
# create the log group?
log_group_name, client = self._log_group_name, self._client
log_group_names = [self._log_group_name, self._metrics_group_name]
client = self._client
exc_types = self._exc_types
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
except ValueError as e:
if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
for log_group_name in log_group_names:
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except ValueError as e:
if e.__cause__ and isinstance(
e.__cause__, awscrt.exceptions.AwsCrtError
):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise

@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_stream(self, log_stream_name: str):
log_group_name, client = self._log_group_name, self._client
def _create_log_stream(self, log_group_name: str, log_stream_name: str):
client = self._client
exc_types = self._exc_types
try:
client.create_log_stream(
Expand All @@ -126,7 +136,9 @@ def _create_log_stream(self, log_stream_name: str):
raise

@retry(backoff_factor=2)
def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
def put_log_events(
self, log_group_name: str, log_stream_name: str, message_list: list[LogMessage]
):
"""
Ref:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
Expand All @@ -137,7 +149,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
See the documentation for more details.
"""
request = LogEvent(
logGroupName=self._log_group_name,
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=message_list,
)
Expand All @@ -148,44 +160,54 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
# logger.debug(f"successfully uploaded: {response}")
except exc_types.ResourceNotFoundException as e:
logger.debug(f"{log_stream_name=} not found: {e!r}")
self._create_log_stream(log_stream_name)
self._create_log_stream(log_group_name, log_stream_name)
raise
except Exception as e:
# NOTE: for unhandled exception, we just log it and ignore,
# leave for the developer to properly handle it
# in the future!
logger.error(
f"put_log_events failure: {e!r}\n"
f"log_group_name={self._log_group_name}, \n"
f"log_group_name={log_group_name}, \n"
f"log_stream_name={log_stream_name}"
)

def thread_main(self) -> NoReturn:
"""Main entry for running this iot_logger in a thread."""
# unconditionally create log_group and log_stream, do nothing if existed.
self._create_log_group()
self._create_log_groups()

while True:
# merge LogMessages into the same source, identified by
# log_stream_suffix.
message_dict: dict[str, list[LogMessage]] = defaultdict(list)
# log_group_type and log_stream_suffix.
message_dict: dict[
(log_group_type, log_stream_suffix), list[LogMessage]
] = defaultdict(list)

_merge_count = 0
while _merge_count < self._max_logs_per_merge:
_queue = self._queue
try:
log_stream_suffix, message = _queue.get_nowait()
log_group_type, log_stream_suffix, message = _queue.get_nowait()
_merge_count += 1

message_dict[log_stream_suffix].append(message)
message_dict[(log_group_type, log_stream_suffix)].append(message)
except Empty:
break

for log_stream_suffix, logs in message_dict.items():
for (log_group_type, log_stream_suffix), logs in message_dict.items():
# get the log_group_name based on the log_group_type
log_group_name = (
self._metrics_group_name
if log_group_type == LogGroupType.METRICS
else self._log_group_name
)

with contextlib.suppress(Exception):
self.put_log_events(
log_group_name,
get_log_stream_name(
self._session_config.thing_name, log_stream_suffix
self._session_config.thing_name,
log_stream_suffix,
),
logs,
)
Expand Down
8 changes: 8 additions & 0 deletions src/otaclient_iot_logging_server/greengrass_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ def aws_cloudwatch_log_group(self) -> str:
f"{self.account_id}/{self.profile}-edge-otaclient"
)

@computed_field
@property
def aws_cloudwatch_metrics_log_group(self) -> str:
return (
f"/aws/greengrass/edge/{self.region}/"
f"{self.account_id}/{self.profile}-edge-otaclient-metrics"
)

Comment on lines +240 to +247
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new log group name for metrics

@computed_field
@property
def aws_credential_refresh_url(self) -> str:
Expand Down
95 changes: 95 additions & 0 deletions src/otaclient_iot_logging_server/v1/servicer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright 2022 TIER IV, INC. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OTA Client IoT Logging Server v1 implementation."""

from __future__ import annotations

import logging
import time
from queue import Full

from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
from otaclient_iot_logging_server.ecu_info import ECUInfo
from otaclient_iot_logging_server.v1.types import (
ErrorCode,
LogType,
PutLogRequest,
PutLogResponse,
)

logger = logging.getLogger(__name__)


class OTAClientIoTLoggingServerServicer:
"""Handlers for otaclient IoT logging service."""

def __init__(
self,
*,
ecu_info: ECUInfo,
queue: LogsQueue,
):
self._queue = queue
self._allowed_ecus = None

if ecu_info:
self._allowed_ecus = ecu_info.ecu_id_set
logger.info(
f"setup allowed_ecu_id from ecu_info.yaml: {ecu_info.ecu_id_set}"
)
else:
logger.warning(
"no ecu_info.yaml presented, logging upload filtering is DISABLED"
)

async def put_log(self, request: PutLogRequest) -> PutLogResponse:
"""
NOTE: use <ecu_id> as log_stream_suffix, each ECU has its own
logging stream for uploading.
"""

def convert_from_log_type_to_log_group_type(log_type):
"""
Convert input log type to log group type
"""
if log_type == LogType.METRICS:
return LogGroupType.METRICS
return LogGroupType.LOG

_ecu_id = request.ecu_id
_log_group_type = convert_from_log_type_to_log_group_type(request.log_type)
_timestamp = (
request.timestamp if request.timestamp else int(time.time()) * 1000
) # milliseconds
_message = request.message
# don't allow empty message request
if not _message:
return PutLogResponse(code=ErrorCode.NO_MESSAGE)
# don't allow unknowned ECUs
# if ECU id is unknown(not listed in ecu_info.yaml), drop this log.
if self._allowed_ecus and _ecu_id not in self._allowed_ecus:
return PutLogResponse(code=ErrorCode.NOT_ALLOWED_ECU_ID)

_logging_msg = LogMessage(
timestamp=_timestamp,
message=_message,
)
# logger.debug(f"receive log from {_ecu_id}: {_logging_msg}")
try:
self._queue.put_nowait((_log_group_type, _ecu_id, _logging_msg))
except Full:
logger.debug(f"message dropped: {_logging_msg}")
return PutLogResponse(code=ErrorCode.SERVER_QUEUE_FULL)

return PutLogResponse(code=ErrorCode.NO_FAILURE)
7 changes: 4 additions & 3 deletions tests/test__log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from queue import Queue

import otaclient_iot_logging_server._log_setting
from otaclient_iot_logging_server._common import LogsQueue
from otaclient_iot_logging_server._common import LogGroupType, LogsQueue
from otaclient_iot_logging_server._log_setting import _LogTeeHandler # type: ignore

MODULE = otaclient_iot_logging_server._log_setting.__name__
Expand All @@ -39,5 +39,6 @@ def test_server_logger():
logger.removeHandler(_handler)
# ------ check result ------ #
_log = _queue.get_nowait()
assert _log[0] == suffix
assert _log[1]
assert _log[0] == LogGroupType.LOG
assert _log[1] == suffix
assert _log[2]
Loading
Loading