Skip to content

Commit

Permalink
Minor changes to Materials CSV source
Browse files Browse the repository at this point in the history
- Move `MaterialsSource` to `csv/base.py` because we don't have any others implementations so far,
- Update types in annotations,
- Add `delimiter` field to CSV Source (we can expect a customer using tabs, commas and etc),
- Add additional validation logic when reading a file (a read will fail if a file does not contain a required column),
- Minor improvements and test changes.
  • Loading branch information
nikitabarskov committed Apr 8, 2024
1 parent cbf5145 commit b38d1f4
Showing 8 changed files with 179 additions and 100 deletions.
2 changes: 1 addition & 1 deletion src/volur/sdk/client.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
from pydantic import ValidationError
from pydantic_settings import BaseSettings, SettingsConfigDict
from volur.pork.materials.v1alpha3 import material_pb2, material_pb2_grpc
from volur.sdk.sources import MaterialSource
from volur.sdk.sources.csv.base import MaterialSource


class VolurClientSettings(BaseSettings):
13 changes: 0 additions & 13 deletions src/volur/sdk/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +0,0 @@
import abc
from typing import Iterator

from pydantic import BaseModel
from volur.pork.materials.v1alpha3 import material_pb2


class MaterialSource(BaseModel):
@abc.abstractmethod
def __iter__(self: "MaterialSource") -> Iterator[material_pb2.Material]: ... # type: ignore[override]

@abc.abstractmethod
def __next__(self: "MaterialSource") -> material_pb2.Material: ...
119 changes: 64 additions & 55 deletions src/volur/sdk/sources/csv/asynchronous.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import csv
from pathlib import Path
from typing import Any, AsyncIterator, Iterator
from typing import AsyncIterator, Iterator

import aiofiles
from loguru import logger
from pydantic import Field
from volur.pork.materials.v1alpha3 import material_pb2
from volur.pork.materials.v1alpha3.material_pb2 import Material
from volur.pork.shared.v1alpha1.characteristic_pb2 import (
Characteristic,
)
from volur.pork.shared.v1alpha1.quantity_pb2 import Quantity
from volur.sdk.sources import MaterialSource
from volur.sdk.sources.csv.base import MaterialSource
from volur.sdk.sources.csv.shared import (
CharacteristicColumn,
Column,
QuantityColumn,
fetch_value,
load_characteristic_value,
load_quantity,
read,
)


@@ -28,8 +26,11 @@ def __iter__(self: "MaterialSource") -> Iterator[material_pb2.Material]: # type
def __next__(self: "MaterialSource") -> material_pb2.Material:
raise NotImplementedError

_data: AsyncIterator[Material] | None = None
_data: AsyncIterator[material_pb2.Material] | None = None
path: str
delimiter: str = Field(
",", description="The separator used in the CSV file. Default is comma."
)
material_id_column: Column = Field(
...,
description="""
@@ -62,8 +63,8 @@ def __next__(self: "MaterialSource") -> material_pb2.Material:
QuantityColumn(column_name="QUANTITY_LBS", unit="pound"),
],
)
characteristics_columns: list[CharacteristicColumn] | None = Field(
default=None,
characteristics_columns: list[CharacteristicColumn] = Field(
default_factory=list,
description="""
The column names used for getting the material
characteristics in the Material entity
@@ -87,65 +88,73 @@ def __next__(self: "MaterialSource") -> material_pb2.Material:
],
)

def __aiter__(self: "MaterialsCSVFileAsyncSource") -> "MaterialsCSVFileAsyncSource":
def __aiter__(
self: "MaterialsCSVFileAsyncSource",
) -> AsyncIterator[material_pb2.Material]:
self._data = self._load()
return self

async def __anext__(self: "MaterialsCSVFileAsyncSource") -> Material:
async def __anext__(
self: "MaterialsCSVFileAsyncSource",
) -> material_pb2.Material:
if self._data is None:
self._data = self._load()
data = await self._data.__anext__()
data = await anext(self._data)
if data is None:
raise StopAsyncIteration()
return data

async def _load(
self: "MaterialsCSVFileAsyncSource",
) -> AsyncIterator[Material]:
_ = Path(self.path)
if not _.exists():
raise ValueError("file does not exist")
if not _.is_file():
raise ValueError("path is not a file")
async with aiofiles.open(_, mode="r") as source:
content = await source.read()
reader: Iterator[dict[str, Any]] = csv.DictReader(content.splitlines())
for row in reader:
material = Material()
) -> AsyncIterator[material_pb2.Material]:
logger.info("reading data from a CSV file")
reader = await read(
self.path,
[
self.material_id_column,
self.plant_id_column,
self.quantity_column,
*self.characteristics_columns,
],
self.delimiter,
)
for _, row in enumerate(reader):
material = material_pb2.Material()
if (
value := fetch_value(
row,
self.material_id_column,
).value_string
) is not None:
material.material_id = value
if self.plant_id_column:
if (
value := fetch_value(
row,
self.material_id_column,
self.plant_id_column,
).value_string
) is not None:
material.material_id = value
if self.plant_id_column:
if (
value := fetch_value(
row,
self.plant_id_column,
).value_string
) is not None:
material.plant = value
if self.quantity_column:
quantity_value = load_quantity(
row.get(self.quantity_column.column_name),
self.quantity_column,
)
quantity = Quantity()
quantity.value.CopyFrom(quantity_value)
material.quantity.CopyFrom(quantity)
if self.characteristics_columns:
material.characteristics.extend(
[
Characteristic(
name=column.characteristic_name,
value=load_characteristic_value(
row.get(column.column_name),
column,
),
)
for column in self.characteristics_columns
]
)
yield material
material.plant = value
if self.quantity_column:
quantity_value = load_quantity(
row.get(self.quantity_column.column_name),
self.quantity_column,
)
quantity = Quantity()
quantity.value.CopyFrom(quantity_value)
material.quantity.CopyFrom(quantity)
if self.characteristics_columns:
material.characteristics.extend(
[
Characteristic(
name=column.characteristic_name,
value=load_characteristic_value(
row.get(column.column_name),
column,
),
)
for column in self.characteristics_columns
]
)
yield material
logger.info("finished reading data from a CSV file")
28 changes: 28 additions & 0 deletions src/volur/sdk/sources/csv/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import abc
from typing import AsyncIterator, Iterator

from pydantic import BaseModel
from volur.pork.materials.v1alpha3 import material_pb2


class MaterialSource(BaseModel):
"""
Base class for material sources.
This class in an abstract class that defines the interface for material CSV
source.
"""

@abc.abstractmethod
def __iter__(self: "MaterialSource") -> Iterator[material_pb2.Material]: ... # type: ignore[override]

@abc.abstractmethod
def __next__(self: "MaterialSource") -> material_pb2.Material: ...

@abc.abstractmethod
def __aiter__(self: "MaterialSource") -> AsyncIterator[material_pb2.Material]: ...

@abc.abstractmethod
async def __anext__(
self: "MaterialSource",
) -> material_pb2.Material: ...
78 changes: 62 additions & 16 deletions src/volur/sdk/sources/csv/shared.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
from typing import Literal
import csv
from pathlib import Path
from typing import Iterator, Literal

import aiofiles
from pydantic import BaseModel, Field
from volur.pork.shared.v1alpha1 import characteristic_pb2, quantity_pb2


class Column(BaseModel):
column_name: str


class CharacteristicColumn(Column):
characteristic_name: str
data_type: Literal[
"string",
"bool",
"integer",
"float",
"datetime",
"date",
]
] = Field("string", description="The data type of the column")


class CharacteristicColumn(Column):
characteristic_name: str


def load_characteristic_value(
value: str | None,
column: CharacteristicColumn,
) -> characteristic_pb2.CharacteristicValue:
if value is None:
raise ValueError("value is missing")
return characteristic_pb2.CharacteristicValue()
if column.data_type == "string":
return characteristic_pb2.CharacteristicValue(value_string=value)
elif column.data_type == "integer":
@@ -54,7 +57,7 @@ def load_quantity(
column: QuantityColumn,
) -> quantity_pb2.QuantityValue:
if value is None:
raise ValueError("value is missing")
return quantity_pb2.QuantityValue()
try:
if column.unit == "kilogram":
return quantity_pb2.QuantityValue(kilogram=float(value))
@@ -93,12 +96,55 @@ class Value(BaseModel):
def fetch_value(row: dict[str, str], column: Column) -> Value:
value = row.get(column.column_name)
if value is None:
raise ValueError(f"column {column.column_name} does not exist")
if value.isdigit():
return Value(value_integer=int(value))
elif value.replace(".", "", 1).isdigit():
return Value(value_float=float(value))
elif value.lower() in ["true", "false"]:
return Value(value_bool=bool(value))
else:
return Value()
if column.data_type == "string":
return Value(value_string=value)
elif column.data_type == "integer":
try:
_ = int(value)
return Value(value_integer=_)
except ValueError as error:
raise ValueError(
f"column {column.column_name} is not an integer"
) from error
elif column.data_type == "float":
try:
_ = float(value)
return Value(value_float=_)
except ValueError as error:
raise ValueError(f"column {column.column_name} is not an float") from error
elif column.data_type == "bool":
if value.lower() in ["true", "false"]:
return Value(value_bool=bool(value))
else:
raise ValueError(f"column {column.column_name} is not an bool")
else:
raise ValueError(f"unknown data type {column.data_type}")


async def read(
path: str,
columns: list[Column | None],
delimeter: str,
) -> Iterator[dict[str, str]]:
_ = Path(path)
if not _.exists():
raise ValueError("file does not exist")
if not _.is_file():
raise ValueError("path is not a file")
async with aiofiles.open(_, mode="r") as source:
content = await source.read()
reader = csv.DictReader(
content.splitlines(),
delimiter=delimeter,
)
required_columns = set([_.column_name for _ in columns if _ is not None])
columns_present_in_file = set(reader.fieldnames) # type: ignore[arg-type]
missing_columns = required_columns.difference(
columns_present_in_file,
)
if missing_columns:
raise ValueError(
f"missing columns in " f"the csv file: {','.join(missing_columns)}"
)
return reader
8 changes: 5 additions & 3 deletions src/volur/sdk/sources/csv/synchronous.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import csv
from pathlib import Path
from typing import Any, AsyncIterator, Iterator
from typing import Any, AsyncIterator, Coroutine, Iterator

from pydantic import ConfigDict, Field
from volur.pork.materials.v1alpha3 import material_pb2
from volur.pork.shared.v1alpha1 import characteristic_pb2, quantity_pb2
from volur.sdk.sources import MaterialSource
from volur.sdk.sources.csv.base import MaterialSource
from volur.sdk.sources.csv.shared import (
CharacteristicColumn,
Column,
@@ -20,7 +20,9 @@ class MaterialsCSVFileSource(MaterialSource):
def __aiter__(self: "MaterialSource") -> AsyncIterator[material_pb2.Material]:
raise NotImplementedError

def __anext__(self: "MaterialSource") -> material_pb2.Material:
def __anext__(
self: "MaterialSource",
) -> Coroutine[None, None, material_pb2.Material]:
raise NotImplementedError

model_config = ConfigDict(
Loading

0 comments on commit b38d1f4

Please sign in to comment.