Skip to content

Commit

Permalink
implement elasticsearch online read
Browse files Browse the repository at this point in the history
fix formatting for readability

fix tests and expanded value proto parsing

add elasticsearch to repo_config
  • Loading branch information
piket committed Nov 2, 2023
1 parent a8eb0a5 commit c7c4e9d
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 33 deletions.
12 changes: 11 additions & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ jobs:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
steps:
- name: Increase swapfile
# Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo swapoff -a
sudo fallocate -l 15G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
sudo swapon --show
- uses: actions/checkout@v2
- name: Setup Python
id: setup-python
Expand Down Expand Up @@ -80,7 +90,7 @@ jobs:
- name: Install dependencies
run: make install-python-ci-dependencies
- name: Test Python
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests
run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests -o log_cli=true

unit-test-go:
runs-on: ubuntu-latest
Expand Down
145 changes: 126 additions & 19 deletions sdk/python/feast/expediagroup/vectordb/elasticsearch_online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,57 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

from bidict import bidict
from elasticsearch import Elasticsearch, helpers
from pydantic.typing import Literal

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import (
BoolList,
BytesList,
DoubleList,
FloatList,
Int32List,
Int64List,
StringList,
)
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.types import (
Array,
Bool,
Bytes,
ComplexFeastType,
FeastType,
Float32,
Float64,
Int32,
Int64,
PrimitiveFeastType,
String,
UnixTimestamp,
)

logger = logging.getLogger(__name__)

TYPE_MAPPING = bidict(
{
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
}
)
TYPE_MAPPING = {
Bytes: "binary",
Int32: "integer",
Int64: "long",
Float32: "float",
Float64: "double",
Bool: "boolean",
String: "text",
UnixTimestamp: "date_nanos",
Array(Bytes): "binary",
Array(Int32): "integer",
Array(Int64): "long",
Array(Float32): "float",
Array(Float64): "double",
Array(Bool): "boolean",
Array(String): "text",
Array(UnixTimestamp): "date_nanos",
}


class ElasticsearchOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -108,7 +123,7 @@ def online_write_batch(
for feature_name, val in values.items():
document[feature_name] = self._get_value_from_value_proto(val)
bulk_documents.append(
{"_index": table.name, "_id": id_val, "doc": document}
{"_index": table.name, "_id": id_val, "_source": document}
)

successes, errors = helpers.bulk(client=es, actions=bulk_documents)
Expand All @@ -123,7 +138,49 @@ def online_read(
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
pass
with ElasticsearchConnectionManager(config) as es:
id_list = []
for entity in entity_keys:
for val in entity.entity_values:
id_list.append(self._get_value_from_value_proto(val))

if requested_features is None:
requested_features = [f.name for f in table.schema]

hits = es.search(
index=table.name,
source=False,
fields=requested_features,
query={"ids": {"values": id_list}},
)["hits"]
if len(hits) > 0 and "hits" in hits:
hits = hits["hits"]
else:
return []

results: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
prefix = "valuetype."
for hit in hits:
result_row = {}
doc = hit["fields"]
for feature in doc:
feast_type = next(
f.dtype for f in table.schema if f.name == feature
)
value = (
doc[feature][0]
if isinstance(feast_type, PrimitiveFeastType)
else doc[feature]
)
value_type_method = f"{feast_type.to_value_type()}_val".lower()
if value_type_method.startswith(prefix):
value_type_method = value_type_method[len(prefix) :]
value_proto = self._create_value_proto(value, value_type_method)
result_row[feature] = value_proto
results.append((None, result_row))
return results

def update(
self,
Expand Down Expand Up @@ -183,8 +240,6 @@ def _create_index(self, es, fv):
logger.info(f"Index {fv.name} created")

def _get_data_type(self, t: FeastType) -> str:
if isinstance(t, ComplexFeastType):
return "text"
return TYPE_MAPPING.get(t, "text")

def _get_value_from_value_proto(self, proto: ValueProto):
Expand All @@ -198,10 +253,62 @@ def _get_value_from_value_proto(self, proto: ValueProto):
value (Any): the extracted value.
"""
val_type = proto.WhichOneof("val")
if not val_type:
return None

value = getattr(proto, val_type) # type: ignore
if val_type == "bytes_val":
value = base64.b64encode(value).decode()
if val_type == "float_list_val":
if val_type == "bytes_list_val":
value = [base64.b64encode(v).decode() for v in value.val]
elif "_list_val" in val_type:
value = list(value.val)

return value

def _create_value_proto(self, feature_val, value_type) -> ValueProto:
"""
Construct Value Proto so that Feast can interpret Elasticsearch results
Parameters:
feature_val (Union[list, int, str, double, float, bool, bytes]): An item in the result that Elasticsearch returns.
value_type (Str): Feast Value type; example: int64_val, float_val, etc.
Returns:
val_proto (ValueProto): Constructed result that Feast can understand.
"""
if value_type == "bytes_list_val":
val_proto = ValueProto(
bytes_list_val=BytesList(val=[base64.b64decode(f) for f in feature_val])
)
elif value_type == "bytes_val":
val_proto = ValueProto(bytes_val=base64.b64decode(feature_val))
elif value_type == "string_list_val":
val_proto = ValueProto(string_list_val=StringList(val=feature_val))
elif value_type == "int32_list_val":
val_proto = ValueProto(int32_list_val=Int32List(val=feature_val))
elif value_type == "int64_list_val":
val_proto = ValueProto(int64_list_val=Int64List(val=feature_val))
elif value_type == "double_list_val":
val_proto = ValueProto(double_list_val=DoubleList(val=feature_val))
elif value_type == "float_list_val":
val_proto = ValueProto(float_list_val=FloatList(val=feature_val))
elif value_type == "bool_list_val":
val_proto = ValueProto(bool_list_val=BoolList(val=feature_val))
elif value_type == "unix_timestamp_list_val":
nanos_list = [
int(datetime.strptime(f, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000)
for f in feature_val
]
val_proto = ValueProto(unix_timestamp_list_val=Int64List(val=nanos_list))
elif value_type == "unix_timestamp_val":
nanos = (
datetime.strptime(feature_val, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()
* 1000
)
val_proto = ValueProto(unix_timestamp_val=int(nanos))
else:
val_proto = ValueProto()
setattr(val_proto, value_type, feature_val)

return val_proto
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"rockset": "feast.infra.online_stores.contrib.rockset_online_store.rockset.RocksetOnlineStore",
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"milvus": "feast.expediagroup.vectordb.milvus_online_store.MilvusOnlineStore",
"elasticsearch": "feast.expediagroup.vectordb.elasticsearch_online_store.ElasticsearchOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[pytest]
markers =
universal_offline_stores: mark a test as using all offline stores.
universal_online_stores: mark a test as using all online stores.
universal_online_stores: mark a test as using all online stores.
env =
TC_MAX_TRIES=300
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


class ElasticsearchOnlineCreator(OnlineStoreCreator):
class ElasticsearchOnlineStoreCreator(OnlineStoreCreator):
def __init__(self, project_name: str, es_port: int):
super().__init__(project_name)
self.elasticsearch_container = ElasticSearchContainer(
Expand Down
Loading

0 comments on commit c7c4e9d

Please sign in to comment.