Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM: feat!: migrate from REST to gRPC #24

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,24 @@ jobs:
# generate the wheel.
python-version: "3.8"

- name: Build wheel and calculate checksum
- name: Install build deps
run: |
python3 -m pip install -U pip
python3 -m pip install -U hatch

- name: Build otaclient IoT logging server package
run: |
python3 -m pip install -q -U pip
pip install -q -U hatch
hatch build -t wheel

- name: Build otaclient IoT logging server proto package
run: |
pushd proto
hatch build -t wheel
popd
cp proto/dist/*.whl dist

- name: calculate checksum
run: |
for WHL in dist/*.whl; \
do \
sha256sum ${WHL} | sed -E "s@(\w+)\s+.*@sha256:\1@" > \
Expand Down
24 changes: 15 additions & 9 deletions proto/otaclient_iot_logging_server_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

syntax = "proto3";

service OtaClientLoggingService {
service OtaClientIoTLoggingService {
/*
* `PutLog` service requests OTA Client logging service to put log.
*/
Expand All @@ -25,6 +25,7 @@ enum LogType {
LOG = 0;
METRICS = 1;
}

enum LogLevel {
UNSPECIFIC = 0;
TRACE = 1;
Expand All @@ -34,18 +35,23 @@ enum LogLevel {
ERROR = 5;
FATAL = 6;
}

enum ErrorCode {
OK = 0;
FAILED = 1;
NO_FAILURE = 0; // Success
SERVER_QUEUE_FULL = 1; // Error: Server queue is full
NOT_ALLOWED_ECU_ID = 2; // Error: Specified ECU ID is not allowed
NO_MESSAGE = 3; // Error: No message in the request
}

message PutLogRequest {
LogType type = 1;
uint64 timestamp = 2;
LogLevel level = 3;
string data = 4;
string ecu_id = 1; // target ECU ID
LogType log_type = 2; // log type
uint64 timestamp = 3; // log timestamp (UNIX time in milliseconds)
LogLevel level = 4; // log level
string message = 5; // log message
}

message PutLogResponse {
ErrorCode code = 1;
string message = 2;
ErrorCode code = 1; // error code
string message = 2; // error message
}
2 changes: 1 addition & 1 deletion proto/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ proto_builds = [
[tool.black]
line-length = 88
target-version = [
'py38',
'py311',
]
extend-exclude = '''(
^.*(_pb2.pyi?|_pb2_grpc.pyi?)$
Expand Down
20 changes: 19 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ dynamic = [
"version",
]
dependencies = [
"aiohttp>=3.10.2,<3.11",
"awsiot-credentialhelper>=0.6,<0.7",
"boto3>=1.34.35,<1.35",
"botocore==1.34.35,<1.35",
"grpcio>=1.53.2,<1.69",
"protobuf>=4.21.12,<5.29",
"pydantic>=2.6,<3",
"pydantic-settings>=2.2.1,<3",
"pyyaml>=6.0.1,<7",
Expand Down Expand Up @@ -76,6 +77,15 @@ sources = [
"src",
]

[tool.black]
line-length = 88
target-version = [
'py311',
]
extend-exclude = '''(
^.*(_pb2.pyi?|_pb2_grpc.pyi?)$
)'''

[tool.ruff]
target-version = "py311"
# NOTE: not include tests and tools for now
Expand All @@ -84,6 +94,10 @@ include = [
"src/**/*.py",
"tests/**/*.py",
]
extend-exclude = [
"*_pb2.py*",
"*_pb2_grpc.py*",
]

lint.select = [
"A", # flake8-builtins
Expand Down Expand Up @@ -129,6 +143,10 @@ source = [
relative_files = true

[tool.coverage.report]
omit = [
"**/*_pb2.py*",
"**/*_pb2_grpc.py*",
]
exclude_also = [
"def __repr__",
"if __name__ == .__main__.:",
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Automatically generated from pyproject.toml by gen_requirements_txt.py script.
# DO NOT EDIT! Only for reference use.
aiohttp>=3.10.2,<3.11
awsiot-credentialhelper>=0.6,<0.7
boto3>=1.34.35,<1.35
botocore==1.34.35,<1.35
grpcio>=1.53.2,<1.69
protobuf>=4.21.12,<5.29
pydantic>=2.6,<3
pydantic-settings>=2.2.1,<3
pyyaml>=6.0.1,<7
Expand Down
8 changes: 7 additions & 1 deletion src/otaclient_iot_logging_server/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@

from __future__ import annotations

from enum import Enum
from queue import Queue
from typing import Literal, TypedDict

from typing_extensions import NotRequired, TypeAlias

LogsQueue: TypeAlias = "Queue[tuple[str, LogMessage]]"
LogsQueue: TypeAlias = "Queue[tuple[LogGroupType, str, LogMessage]]"


class LogGroupType(Enum):
LOG = 0
METRICS = 1


class LogMessage(TypedDict):
Expand Down
8 changes: 4 additions & 4 deletions src/otaclient_iot_logging_server/_log_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
import contextlib
import logging
import time
from queue import Queue

from otaclient_iot_logging_server import package_name as root_package_name
from otaclient_iot_logging_server._common import LogMessage
from otaclient_iot_logging_server._common import LogGroupType, LogMessage, LogsQueue
from otaclient_iot_logging_server.configs import server_cfg


Expand All @@ -30,7 +29,7 @@ class _LogTeeHandler(logging.Handler):

def __init__(
self,
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
logstream_suffix: str,
) -> None:
super().__init__()
Expand All @@ -41,6 +40,7 @@ def emit(self, record: logging.LogRecord) -> None:
with contextlib.suppress(Exception):
self._queue.put_nowait(
(
LogGroupType.LOG, # always put into log group
self._logstream_suffix,
LogMessage(
timestamp=int(time.time()) * 1000, # milliseconds
Expand All @@ -51,7 +51,7 @@ def emit(self, record: logging.LogRecord) -> None:


def config_logging(
queue: Queue[tuple[str, LogMessage]],
queue: LogsQueue,
*,
log_format: str,
level: str,
Expand Down
90 changes: 56 additions & 34 deletions src/otaclient_iot_logging_server/aws_iot_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
import awscrt.exceptions
from typing_extensions import NoReturn

from otaclient_iot_logging_server._common import LogEvent, LogMessage, LogsQueue
from otaclient_iot_logging_server._common import (
LogEvent,
LogGroupType,
LogMessage,
LogsQueue,
)
from otaclient_iot_logging_server._utils import retry
from otaclient_iot_logging_server.boto3_session import get_session
from otaclient_iot_logging_server.configs import server_cfg
Expand Down Expand Up @@ -68,40 +73,45 @@ def __init__(

self._session_config = session_config
self._log_group_name = session_config.aws_cloudwatch_log_group
self._metrics_group_name = session_config.aws_cloudwatch_metrics_log_group
self._interval = interval
self._queue: LogsQueue = queue
# NOTE: add this limitation to ensure all of the log_streams in a merge
# will definitely have entries less than MAX_LOGS_PER_PUT
self._max_logs_per_merge = min(max_logs_per_merge, self.MAX_LOGS_PER_PUT)

@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_group(self):
def _create_log_groups(self):
# TODO: (20240214) should we let the edge side iot_logging_server
# create the log group?
log_group_name, client = self._log_group_name, self._client
log_group_names = [self._log_group_name, self._metrics_group_name]
client = self._client
exc_types = self._exc_types
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
except ValueError as e:
if e.__cause__ and isinstance(e.__cause__, awscrt.exceptions.AwsCrtError):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
for log_group_name in log_group_names:
try:
client.create_log_group(logGroupName=log_group_name)
logger.info(f"{log_group_name=} has been created")
except exc_types.ResourceAlreadyExistsException as e:
logger.debug(
f"{log_group_name=} already existed, skip creating: {e.response}"
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except ValueError as e:
if e.__cause__ and isinstance(
e.__cause__, awscrt.exceptions.AwsCrtError
):
logger.error(
(f"failed to create mtls connection to remote: {e.__cause__}")
)
raise e.__cause__ from None
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise
except Exception as e:
logger.error(f"failed to create {log_group_name=}: {e!r}")
raise

@retry(max_retry=16, backoff_factor=2, backoff_max=32)
def _create_log_stream(self, log_stream_name: str):
log_group_name, client = self._log_group_name, self._client
def _create_log_stream(self, log_group_name: str, log_stream_name: str):
client = self._client
exc_types = self._exc_types
try:
client.create_log_stream(
Expand All @@ -126,7 +136,9 @@ def _create_log_stream(self, log_stream_name: str):
raise

@retry(backoff_factor=2)
def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
def put_log_events(
self, log_group_name: str, log_stream_name: str, message_list: list[LogMessage]
):
"""
Ref:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html
Expand All @@ -137,7 +149,7 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
See the documentation for more details.
"""
request = LogEvent(
logGroupName=self._log_group_name,
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=message_list,
)
Expand All @@ -148,44 +160,54 @@ def put_log_events(self, log_stream_name: str, message_list: list[LogMessage]):
# logger.debug(f"successfully uploaded: {response}")
except exc_types.ResourceNotFoundException as e:
logger.debug(f"{log_stream_name=} not found: {e!r}")
self._create_log_stream(log_stream_name)
self._create_log_stream(log_group_name, log_stream_name)
raise
except Exception as e:
# NOTE: for unhandled exception, we just log it and ignore,
# leave for the developer to properly handle it
# in the future!
logger.error(
f"put_log_events failure: {e!r}\n"
f"log_group_name={self._log_group_name}, \n"
f"log_group_name={log_group_name}, \n"
f"log_stream_name={log_stream_name}"
)

def thread_main(self) -> NoReturn:
"""Main entry for running this iot_logger in a thread."""
# unconditionally create log_group and log_stream, do nothing if existed.
self._create_log_group()
self._create_log_groups()

while True:
# merge LogMessages into the same source, identified by
# log_stream_suffix.
message_dict: dict[str, list[LogMessage]] = defaultdict(list)
# log_group_type and log_stream_suffix.
message_dict: dict[
(log_group_type, log_stream_suffix), list[LogMessage]
] = defaultdict(list)

_merge_count = 0
while _merge_count < self._max_logs_per_merge:
_queue = self._queue
try:
log_stream_suffix, message = _queue.get_nowait()
log_group_type, log_stream_suffix, message = _queue.get_nowait()
_merge_count += 1

message_dict[log_stream_suffix].append(message)
message_dict[(log_group_type, log_stream_suffix)].append(message)
except Empty:
break

for log_stream_suffix, logs in message_dict.items():
for (log_group_type, log_stream_suffix), logs in message_dict.items():
# get the log_group_name based on the log_group_type
log_group_name = (
self._metrics_group_name
if log_group_type == LogGroupType.METRICS
else self._log_group_name
)

with contextlib.suppress(Exception):
self.put_log_events(
log_group_name,
get_log_stream_name(
self._session_config.thing_name, log_stream_suffix
self._session_config.thing_name,
log_stream_suffix,
),
logs,
)
Expand Down
8 changes: 8 additions & 0 deletions src/otaclient_iot_logging_server/greengrass_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ def aws_cloudwatch_log_group(self) -> str:
f"{self.account_id}/{self.profile}-edge-otaclient"
)

@computed_field
@property
def aws_cloudwatch_metrics_log_group(self) -> str:
return (
f"/aws/greengrass/edge/{self.region}/"
f"{self.account_id}/{self.profile}-edge-otaclient-metrics"
)

@computed_field
@property
def aws_credential_refresh_url(self) -> str:
Expand Down
Loading
Loading